Repository: curator Updated Branches: refs/heads/CURATOR-487 [created] ec7020861
Make GzipCompressionProvider to recycle Deflaters and Inflaters in pools Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6457f267 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6457f267 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6457f267 Branch: refs/heads/CURATOR-487 Commit: 6457f267ad713854fb89d32a56f266202cf82cd5 Parents: eebff92 Author: Roman Leventov <[email protected]> Authored: Fri Nov 30 18:49:10 2018 +0100 Committer: Roman Leventov <[email protected]> Committed: Fri Nov 30 18:55:36 2018 +0100 ---------------------------------------------------------------------- .../framework/imps/GzipCompressionProvider.java | 307 +++++++++++++++++-- .../imps/GzipCompressionProviderTest.java | 88 ++++++ 2 files changed, 373 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/6457f267/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java index 7b35c37..74d0347 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java @@ -18,47 +18,310 @@ */ package org.apache.curator.framework.imps; +import com.google.common.annotations.VisibleForTesting; import org.apache.curator.framework.api.CompressionProvider; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.zip.*; public class GzipCompressionProvider implements CompressionProvider { + // This class re-implements java.util.zip.GZIPInputStream and GZIPOutputStream functionality to avoid + // creation many finalized Deflater and Inflater objects on heap (see + // https://issues.apache.org/jira/browse/CURATOR-487). Even when Curator's minimum supported Java version becomes + // no less than Java 12, where finalize() methods are removed in Deflater and Inflater classes and instead they + // are phantom-referenced via Cleaner, it still makes sense to avoid GZIPInputStream and GZIPOutputStream because + // phantom references are also not entirely free for GC algorithms, and also to allocate less garbage and make + // less unnecessary data copies. + + private static final int MAX_SAFE_JAVA_BYTE_ARRAY_SIZE = Integer.MAX_VALUE - 128; + + /** GZIP header magic number. */ + private static final int GZIP_MAGIC = 0x8b1f; + + /** See {@code java.util.zip.GZIPOutputStream.writeHeader()} */ + private static final byte[] GZIP_HEADER = new byte[] { + (byte) GZIP_MAGIC, // Magic number (byte 0) + (byte) (GZIP_MAGIC >> 8), // Magic number (byte 1) + Deflater.DEFLATED, // Compression method (CM) + 0, // Flags (FLG) + 0, // Modification time MTIME (byte 0) + 0, // Modification time MTIME (byte 1) + 0, // Modification time MTIME (byte 2) + 0, // Modification time MTIME (byte 3) + 0, // Extra flags (XFLG) + 0 // Operating system (OS) + }; + + /** GZip flags, {@link #GZIP_HEADER}'s 4th byte */ + private static final int FHCRC = 1 << 1; + private static final int FEXTRA = 1 << 2; + private static final int FNAME = 1 << 3; + private static final int FCOMMENT = 1 << 4; + + private static final int GZIP_HEADER_SIZE = GZIP_HEADER.length; + + /** 32-bit CRC and uncompressed data size */ + private static final int GZIP_TRAILER_SIZE = Integer.BYTES + Integer.BYTES; + + /** + * Since Deflaters and Inflaters are acquired and returned to the pools in try-finally blocks that are free of + * blocking calls themselves, it's not expected that the number of objects in the pools could exceed the number of + * hardware threads on the machine much. Therefore it's accepted to have simple pools of strongly-referenced + * objects. + */ + private static final ConcurrentLinkedQueue<Deflater> DEFLATER_POOL = new ConcurrentLinkedQueue<>(); + private static final ConcurrentLinkedQueue<Inflater> INFLATER_POOL = new ConcurrentLinkedQueue<>(); + + /** The value verified in GzipCompressionProviderTest.testEmpty() */ + private static final byte[] COMPRESSED_EMPTY_BYTES = new byte[] { + 31, -117, 8, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0 + }; + + private static Deflater acquireDeflater() + { + Deflater deflater = DEFLATER_POOL.poll(); + if ( deflater == null ) + { + // Using the same settings as in GZIPOutputStream constructor + deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); + } + return deflater; + } + + private static Inflater acquireInflater() + { + Inflater inflater = INFLATER_POOL.poll(); + if ( inflater == null ) + { + // Using the same nowrap setting as GZIPInputStream constructor + inflater = new Inflater(true); + } + return inflater; + } + @Override - public byte[] compress(String path, byte[] data) throws Exception + public byte[] compress(String path, byte[] data) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - GZIPOutputStream out = new GZIPOutputStream(bytes); + if ( data.length == 0 ) + { + // clone() because clients could update the array + return COMPRESSED_EMPTY_BYTES.clone(); + } + return doCompress(data); + } + + @VisibleForTesting + static byte[] doCompress(byte[] data) + { + byte[] result = Arrays.copyOf(GZIP_HEADER, conservativeGZippedSizeEstimate(data.length)); + Deflater deflater = acquireDeflater(); try { - out.write(data); - out.finish(); + deflater.setInput(data); + deflater.finish(); + int offset = GZIP_HEADER_SIZE; + while ( true ) + { + int available = result.length - GZIP_TRAILER_SIZE - offset; + int numCompressedBytes = deflater.deflate(result, offset, available); + offset += numCompressedBytes; + if ( deflater.finished() ) + { + break; + } + // `+ 1` to ensure some growth on the sizes of 0 or 1 + int newResultLength = result.length + (result.length / 2) + 1; + result = Arrays.copyOf(result, newResultLength); + } + // Write GZip trailer + CRC32 crc = new CRC32(); + crc.update(data, 0, data.length); + writeLittleEndianInt(result, offset, (int) crc.getValue()); + writeLittleEndianInt(result, offset + 4, data.length); + int endOffset = offset + GZIP_TRAILER_SIZE; + if ( result.length != endOffset ) + { + result = Arrays.copyOf(result, endOffset); + } + return result; } finally { - out.close(); + deflater.reset(); + DEFLATER_POOL.add(deflater); } - return bytes.toByteArray(); } - @Override - public byte[] decompress(String path, byte[] compressedData) throws Exception + private static int conservativeGZippedSizeEstimate(int dataSize) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(compressedData.length); - GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressedData)); + int conservativeCompressedDataSizeEstimate; + if ( dataSize < 512 ) + { + // Assuming DEFLATE doesn't compress small data well + conservativeCompressedDataSizeEstimate = dataSize; + } + else + { + // Assuming pretty bad 2:1 compression ratio + conservativeCompressedDataSizeEstimate = Math.max(512, dataSize / 2); + } + return GZIP_HEADER_SIZE + conservativeCompressedDataSizeEstimate + GZIP_TRAILER_SIZE; + } + + private static void writeLittleEndianInt(byte[] b, int offset, int v) + { + b[offset] = (byte) v; + b[offset + 1] = (byte) (v >> 8); + b[offset + 2] = (byte) (v >> 16); + b[offset + 3] = (byte) (v >> 24); + } + + @Override + public byte[] decompress(String path, byte[] gzippedDataBytes) throws IOException { + if ( Arrays.equals(gzippedDataBytes, COMPRESSED_EMPTY_BYTES) ) + { + // Allocating a new array instead of creating a static constant because clients may somehow depend on the + // identity of the returned arrays + return new byte[0]; + } + ByteBuffer gzippedData = ByteBuffer.wrap(gzippedDataBytes); + gzippedData.order(ByteOrder.LITTLE_ENDIAN); + int headerSize = readGzipHeader(gzippedData); + if ( gzippedDataBytes.length < headerSize + GZIP_TRAILER_SIZE ) + { + throw new EOFException("Too short GZipped data"); + } + int compressedDataSize = gzippedDataBytes.length - headerSize - GZIP_TRAILER_SIZE; + // Assuming 3:1 compression ratio. Intentionally a more generous estimation than in + // conservativeGZippedSizeEstimate() to reduce the probability of result array reallocation. + int initialResultLength = (int) Math.min(compressedDataSize * 3L, MAX_SAFE_JAVA_BYTE_ARRAY_SIZE); + byte[] result = new byte[initialResultLength]; + Inflater inflater = acquireInflater(); try { - byte[] buffer = new byte[compressedData.length]; - for(;;) + inflater.setInput(gzippedDataBytes, headerSize, compressedDataSize); + CRC32 crc = new CRC32(); + int offset = 0; + while (true) { - int bytesRead = in.read(buffer, 0, buffer.length); - if ( bytesRead < 0 ) + int numDecompressedBytes; + try { + numDecompressedBytes = inflater.inflate(result, offset, result.length - offset); + } catch (DataFormatException e) { + String s = e.getMessage(); + throw new ZipException(s != null ? s : "Invalid ZLIB data format"); + } + crc.update(result, offset, numDecompressedBytes); + offset += numDecompressedBytes; + if ( inflater.finished() || inflater.needsDictionary() ) { break; } - bytes.write(buffer, 0, bytesRead); + else if ( inflater.needsInput() ) + { + throw new ZipException("Corrupt GZipped data"); + } + // Inflater's contract doesn't say whether it's able to be finished() without returning 0 from inflate() + // call, so the additional `numDecompressedBytes == 0` condition ensures that we did another cycle and + // definitely need to inflate some more bytes. + if ( result.length == MAX_SAFE_JAVA_BYTE_ARRAY_SIZE && numDecompressedBytes == 0 ) + { + throw new OutOfMemoryError("Unable to uncompress that much data into a single byte[] array"); + } + // `+ 1` to ensure some growth on the sizes of 0 or 1 + int newResultLength = + (int) Math.min((long) result.length + (result.length / 2) + 1, MAX_SAFE_JAVA_BYTE_ARRAY_SIZE); + if ( result.length != newResultLength ) + { + result = Arrays.copyOf(result, newResultLength); + } + } + if ( inflater.getRemaining() != 0 ) + { + throw new ZipException("Expected just one GZip block, without garbage in the end"); } + int checksum = gzippedData.getInt(gzippedDataBytes.length - GZIP_TRAILER_SIZE); + int numUncompressedBytes = gzippedData.getInt(gzippedDataBytes.length - Integer.BYTES); + if ( checksum != (int) crc.getValue() || numUncompressedBytes != offset ) + { + throw new ZipException("Corrupt GZIP trailer"); + } + if ( result.length != offset ) + { + result = Arrays.copyOf(result, offset); + } + return result; } finally { - in.close(); + inflater.reset(); + INFLATER_POOL.add(inflater); + } + } + + /** + * Returns the header size + */ + private static int readGzipHeader(ByteBuffer gzippedData) throws IOException + { + try { + return doReadHeader(gzippedData); + } catch (BufferUnderflowException e) { + throw new EOFException(); + } + } + + private static int doReadHeader(ByteBuffer gzippedData) throws IOException { + if ( gzippedData.getChar() != GZIP_MAGIC ) + { + throw new ZipException("Not in GZip format"); + } + if ( gzippedData.get() != Deflater.DEFLATED ) + { + throw new ZipException("Unsupported compression method"); + } + int flags = gzippedData.get(); + // Skip MTIME, XFL, and OS fields + skip(gzippedData, Integer.BYTES + Byte.BYTES + Byte.BYTES); + if ( (flags & FEXTRA) != 0 ) + { + int extraBytes = gzippedData.getChar(); + skip(gzippedData, extraBytes); + } + if ( (flags & FNAME) != 0 ) + { + skipZeroTerminatedString(gzippedData); + } + if ( (flags & FCOMMENT) != 0 ) + { + skipZeroTerminatedString(gzippedData); + } + if ( (flags & FHCRC) != 0 ) + { + CRC32 crc = new CRC32(); + crc.update(gzippedData.array(), 0, gzippedData.position()); + if ( gzippedData.getChar() != (char) crc.getValue() ) + { + throw new ZipException("Corrupt GZIP header"); + } + } + return gzippedData.position(); + } + + private static void skip(ByteBuffer gzippedData, int skipBytes) throws IOException + { + try { + gzippedData.position(gzippedData.position() + skipBytes); + } catch (IllegalArgumentException e) { + throw new EOFException(); + } + } + + private static void skipZeroTerminatedString(ByteBuffer gzippedData) + { + while (gzippedData.get() != 0) { + // loop } - return bytes.toByteArray(); } } http://git-wip-us.apache.org/repos/asf/curator/blob/6457f267/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java new file mode 100644 index 0000000..11b8c63 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class GzipCompressionProviderTest +{ + @Test + public void testSimple() throws IOException + { + GzipCompressionProvider provider = new GzipCompressionProvider(); + byte[] data = "Hello, world!".getBytes(); + byte[] compressedData = provider.compress(null, data); + byte[] decompressedData = provider.decompress(null, compressedData); + Assert.assertTrue(Arrays.equals(decompressedData, data)); + } + + @Test + public void testEmpty() throws IOException + { + GzipCompressionProvider provider = new GzipCompressionProvider(); + byte[] compressedData = provider.compress(null, new byte[0]); + byte[] compressedData2 = GzipCompressionProvider.doCompress(new byte[0]); + // Ensures GzipCompressionProvider.COMPRESSED_EMPTY_BYTES value is valid + Assert.assertTrue(Arrays.equals(compressedData, compressedData2)); + byte[] decompressedData = provider.decompress(null, compressedData); + Assert.assertEquals(0, decompressedData.length); + } + + /** + * This test ensures that in the face of corrupt data, specifically IOException is thrown, rather some other kind + * of runtime exception. Users of {@link GzipCompressionProvider#decompress(String, byte[])} may depend on this. + */ + @Test + public void testDecompressCorrupt() + { + GzipCompressionProvider provider = new GzipCompressionProvider(); + try { + provider.decompress(null, new byte[100]); + Assert.fail("Expected IOException"); + } catch (IOException ignore) { + // expected + } + byte[] compressedData = provider.compress(null, new byte[0]); + for (int i = 0; i < compressedData.length; i++) + { + try { + provider.decompress(null, Arrays.copyOf(compressedData, i)); + } catch (IOException ignore) { + // expected + } + for (int change = 1; change < 256; change++) + { + byte b = compressedData[i]; + compressedData[i] = (byte) (b + change); + try { + provider.decompress(null, compressedData); + // No exception is OK + } catch (IOException ignore) { + // expected + } + // reset value back + compressedData[i] = b; + } + } + } +}
