Repository: kafka Updated Branches: refs/heads/trunk b9b2cfc28 -> ca2979f84
MINOR: Suppress ProducerConfig warning in MirrorMaker Though MirrorMaker uses the `producer.type` value of the producer properties, ProducerConfig show the warning: `The configuration 'producer.type' was supplied but isn't a known config.` Author: Shun Takebayashi <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2676 from takebayashi/suppress-mirrormaker-warning Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca2979f8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca2979f8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca2979f8 Branch: refs/heads/trunk Commit: ca2979f847ead39b4480f31085c5cf26bb102080 Parents: b9b2cfc Author: Shun Takebayashi <[email protected]> Authored: Tue Apr 4 00:16:30 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 4 00:16:35 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 8 ++++---- .../integration/kafka/tools/MirrorMakerIntegrationTest.scala | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2979f8/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d359c1a..5d88b4e 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -225,6 +225,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + val sync = producerProps.getProperty("producer.type", "async").equals("sync") + producerProps.remove("producer.type") // Defaults to no data loss settings. maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) @@ -233,7 +235,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // Always set producer key and value serializer to ByteArraySerializer. producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producer = new MirrorMakerProducer(producerProps) + producer = new MirrorMakerProducer(sync, producerProps) // Create consumers val mirrorMakerConsumers = if (useOldConsumer) { @@ -696,9 +698,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private[tools] class MirrorMakerProducer(val producerProps: Properties) { - - val sync = producerProps.getProperty("producer.type", "async").equals("sync") + private[tools] class MirrorMakerProducer(val sync: Boolean, val producerProps: Properties) { val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) http://git-wip-us.apache.org/repos/asf/kafka/blob/ca2979f8/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index 465e8de..b7b1a12 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -41,10 +41,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness { val producerProps = new Properties producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put("producer.type", "sync") producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) - val producer = new MirrorMakerProducer(producerProps) + val producer = new MirrorMakerProducer(true, producerProps) MirrorMaker.producer = producer MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) MirrorMaker.producer.close()
