Repository: kafka Updated Branches: refs/heads/0.8.2 fcfeba73b -> da2c7cd50
KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da2c7cd5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da2c7cd5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da2c7cd5 Branch: refs/heads/0.8.2 Commit: da2c7cd50e3fea4aa44b0804d729c01990dfb2c1 Parents: fcfeba7 Author: Ewen Cheslack-Postava <[email protected]> Authored: Sun Oct 12 16:39:39 2014 -0700 Committer: Neha Narkhede <[email protected]> Committed: Sun Oct 12 16:39:44 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/producer/ProducerConfig.java | 4 ++-- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 4 ++-- core/src/main/scala/kafka/tools/PerfConfig.scala | 2 +- .../scala/integration/kafka/api/ProducerCompressionTest.scala | 3 +++ core/src/test/scala/unit/kafka/message/MessageTest.scala | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da2c7cd5/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f9de4af..9e2e280 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -151,8 +151,8 @@ public class ProducerConfig extends AbstractConfig { /** <code>compression.type</code> */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, or <code>snappy</code>. Compression is of full batches of data, " - + " so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; + private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>lz4hc</code>. " + + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** <code>metrics.sample.window.ms</code> */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; http://git-wip-us.apache.org/repos/asf/kafka/blob/da2c7cd5/core/src/main/scala/kafka/tools/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 8e9ba0b..b024a69 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -113,8 +113,8 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." + + "If specified without value, then it defaults to 'gzip'") .withOptionalArg() .describedAs("compression-codec") .ofType(classOf[String]) http://git-wip-us.apache.org/repos/asf/kafka/blob/da2c7cd5/core/src/main/scala/kafka/tools/PerfConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index 129cc01..c720029 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) { .defaultsTo(200) val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") http://git-wip-us.apache.org/repos/asf/kafka/blob/da2c7cd5/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 17e2c6e..c954851 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -121,8 +121,11 @@ object ProducerCompressionTest { @Parameters def parameters: Collection[Array[String]] = { val list = new ArrayList[Array[String]]() + list.add(Array("none")) list.add(Array("gzip")) list.add(Array("snappy")) + list.add(Array("lz4")) + list.add(Array("lz4hc")) list } } http://git-wip-us.apache.org/repos/asf/kafka/blob/da2c7cd5/core/src/test/scala/unit/kafka/message/MessageTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 4837585..958c1a6 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite { def setUp(): Unit = { val keys = Array(null, "key".getBytes, "".getBytes) val vals = Array("value".getBytes, "".getBytes, null) - val codecs = Array(NoCompressionCodec, GZIPCompressionCodec) + val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec) for(k <- keys; v <- vals; codec <- codecs) messages += new MessageTestVal(k, v, codec, new Message(v, k, codec)) }
