Repository: kafka Updated Branches: refs/heads/trunk b63e41ea7 -> 4959444af
KAFKA-5236; Increase the block/buffer size when compressing with Snappy or Gzip We had originally increased Snappyâs block size as part of KAFKA-3704. However, we had some issues with excessive memory usage in the producer and we reverted it in 7c6ee8d5e. After more investigation, we fixed the underlying reason why memory usage seemed to grow much more than expected via KAFKA-3747 (included in 0.10.0.1). In 0.10.2, we changed the broker to use the same classes as the producer and the brokerâs block size for Snappy was changed from 32 KB to 1KB. As reported in KAFKA-5236, the on disk size is, in some cases, 50% larger when the data is compressed with 1 KB instead of 32 KB as the block size. As discussed in KAFKA-3704, it may be worth making this configurable and/or allocate the compression buffers from the producer pool. However, for 0.11.0.0, I think the simplest thing to do is to default to 32 KB for Snappy (the default if no block size is provided). I also increased the Gzip buffer size. 1 KB is too small and the default is smaller still (512 bytes). 8 KB (which is the default buffer size for BufferedOutputStream) seemed like a reasonable default. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3205 from ijuma/kafka-5236-snappy-block-size Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4959444a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4959444a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4959444a Branch: refs/heads/trunk Commit: 4959444afc927026f48f5c7d9babed7b9f1bea50 Parents: b63e41e Author: Ismael Juma <[email protected]> Authored: Fri Jun 2 21:20:02 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri Jun 2 21:20:02 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/CompressionType.java | 17 +++++++++-------- .../kafka/common/record/MemoryRecordsBuilder.java | 4 +--- .../kafka/common/record/CompressionTypeTest.java | 4 ++-- .../common/record/SimpleLegacyRecordTest.java | 2 +- .../kafka/message/MessageCompressionTest.scala | 12 ++++++------ docs/upgrade.html | 6 ++++++ 6 files changed, 25 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 742493b..16d6e01 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -35,7 +35,7 @@ import java.util.zip.GZIPOutputStream; public enum CompressionType { NONE(0, "none", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return buffer; } @@ -47,9 +47,10 @@ public enum CompressionType { GZIP(1, "gzip", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { - return new GZIPOutputStream(buffer, bufferSize); + // GZIPOutputStream has a default buffer size of 512 bytes, which is too small + return new GZIPOutputStream(buffer, 8 * 1024); } catch (Exception e) { throw new KafkaException(e); } @@ -67,9 +68,9 @@ public enum CompressionType { SNAPPY(2, "snappy", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { - return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer, bufferSize); + return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer); } catch (Throwable e) { throw new KafkaException(e); } @@ -87,7 +88,7 @@ public enum CompressionType { LZ4(3, "lz4", 1.0f) { @Override - public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { + public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0); } catch (Throwable e) { @@ -124,7 +125,7 @@ public enum CompressionType { * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written. * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved. */ - public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, int bufferSize); + public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion); /** * Wrap buffer with an InputStream that will decompress data with this CompressionType. @@ -178,7 +179,7 @@ public enum CompressionType { static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream", MethodType.methodType(void.class, InputStream.class)); static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream", - MethodType.methodType(void.class, OutputStream.class, Integer.TYPE)); + MethodType.methodType(void.class, OutputStream.class)); } private static MethodHandle findConstructor(String className, MethodType methodType) { http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 89d314d..cd9ba0e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -38,7 +38,6 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; */ public class MemoryRecordsBuilder { private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; - private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; private final TimestampType timestampType; private final CompressionType compressionType; @@ -124,8 +123,7 @@ public class MemoryRecordsBuilder { } this.bufferStream = bufferStream; - this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic, - COMPRESSION_DEFAULT_BUFFER_SIZE)); + this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java index fe196c8..d76a577 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java @@ -30,7 +30,7 @@ public class CompressionTypeTest { public void testLZ4FramingMagicV0() { ByteBuffer buffer = ByteBuffer.allocate(256); KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput( - new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, 256); + new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0); assertTrue(out.useBrokenFlagDescriptorChecksum()); buffer.rewind(); @@ -44,7 +44,7 @@ public class CompressionTypeTest { public void testLZ4FramingMagicV1() { ByteBuffer buffer = ByteBuffer.allocate(256); KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput( - new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, 256); + new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1); assertFalse(out.useBrokenFlagDescriptorChecksum()); buffer.rewind(); http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java index b409af6..5f578a8 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java @@ -48,7 +48,7 @@ public class SimpleLegacyRecordTest { public void testCompressedIterationWithEmptyRecords() throws Exception { ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64); OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue), - RecordBatch.MAGIC_VALUE_V1, 64); + RecordBatch.MAGIC_VALUE_V1); gzipOutput.close(); emptyCompressedValue.flip(); http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index b0913db..0e74f04 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -28,11 +28,11 @@ class MessageCompressionTest extends JUnitSuite { @Test def testSimpleCompressDecompress() { val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec) - if(isSnappyAvailable) + if (isSnappyAvailable) codecs += SnappyCompressionCodec - if(isLZ4Available) + if (isLZ4Available) codecs += LZ4CompressionCodec - for(codec <- codecs) + for (codec <- codecs) testSimpleCompressDecompress(codec) } @@ -48,10 +48,10 @@ class MessageCompressionTest extends JUnitSuite { testCompressSize(GZIPCompressionCodec, messages, 396) - if(isSnappyAvailable) - testCompressSize(SnappyCompressionCodec, messages, 1063) + if (isSnappyAvailable) + testCompressSize(SnappyCompressionCodec, messages, 502) - if(isLZ4Available) + if (isLZ4Available) testCompressSize(LZ4CompressionCodec, messages, 387) } http://git-wip-us.apache.org/repos/asf/kafka/blob/4959444a/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index aae058b..ca905ae 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -60,6 +60,12 @@ <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.</li> + <li> When compressing data with snappy, the producer and broker will use the compression scheme's default block size (2 x 32 KB) + instead of 1 KB in order to improve the compression ratio. There have been reports of data compressed with the smaller + block size being 50% larger than when compressed with the larger block size. For the snappy case, a producer with 5000 + partitions will require an additional 315 MB of JVM heap.</li> + <li> Similarly, when compressing data with gzip, the producer and broker will use 8 KB instead of 1 KB as the buffer size. The default + for gzip is excessively low (512 bytes). </li> <li>The broker configuration <code>max.message.bytes</code> now applies to the total size of a batch of messages. Previously the setting applied to batches of compressed messages, or to non-compressed messages individually. In practice, the change is minor since a message batch may consist of only a single message, so the limitation on the size of
