Repository: samza Updated Branches: refs/heads/master 05a02eed1 -> 8b52e8aec
SAMZA-586; disable compressed messages when sending to a store's changelog topic from a kafka system Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b52e8ae Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b52e8ae Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b52e8ae Branch: refs/heads/master Commit: 8b52e8aecbca2915d50f0ae3600ceec799fac60f Parents: 05a02ee Author: Chris Riccomini <[email protected]> Authored: Thu Mar 19 09:25:29 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Mar 19 09:25:29 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/StorageConfig.scala | 16 +++++++- .../apache/samza/config/TestStorageConfig.scala | 40 ++++++++++++++++++++ .../kafka/KafkaCheckpointManagerFactory.scala | 4 +- .../samza/system/kafka/KafkaSystemFactory.scala | 12 +++++- .../system/kafka/TestKafkaSystemFactory.scala | 13 +++++++ 5 files changed, 81 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index f977b8b..be3f106 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -20,8 +20,8 @@ package org.apache.samza.config import scala.collection.JavaConversions._ - import org.apache.samza.util.Logging +import org.apache.samza.util.Util object StorageConfig { // stream config constants @@ -43,4 +43,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging val conf = config.subset("stores.", true) conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq } + + /** + * Helper method to check if a system has a changelog attached to it. + */ + def isChangelogSystem(systemName: String) = { + config + .getStoreNames + // Get changelogs for all stores in the format of "system.stream" + .map(getChangelogStream(_)) + .filter(_.isDefined) + // Convert "system.stream" to systemName + .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem) + .contains(systemName) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala new file mode 100644 index 0000000..81a35ec --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config + +import scala.collection.JavaConversions._ +import org.apache.samza.config.StorageConfig._ +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Test + +class TestStorageConfig { + @Test + def testIsChangelogSystem { + val configMap = Map[String, String]( + FACTORY.format("system1") -> "some.factory.Class", + CHANGELOG_STREAM.format("system1") -> "system1.stream1", + FACTORY.format("system2") -> "some.factory.Class") + val config = new MapConfig(configMap) + assertFalse(config.isChangelogSystem("system3")) + assertFalse(config.isChangelogSystem("system2")) + assertTrue(config.isChangelogSystem("system1")) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 7fc6d89..3dfa26a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -43,8 +43,8 @@ object KafkaCheckpointManagerFactory { val INJECTED_PRODUCER_PROPERTIES = Map( "acks" -> "all", // Forcibly disable compression because Kafka doesn't support compression - // on log compacted topics. Details in SAMZA-393. - "compression.codec" -> "none") + // on log compacted topics. Details in SAMZA-586. + "compression.type" -> "none") // Set the checkpoint topic configs to have a very small segment size and // enable log compaction. This keeps job startup time small since there http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 4f15002..c84ceb7 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -25,9 +25,18 @@ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.system.SystemFactory +import org.apache.samza.config.StorageConfig._ import org.I0Itec.zkclient.ZkClient import kafka.utils.ZKStringSerializer +object KafkaSystemFactory extends Logging { + def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { + warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName) + Map[String, String]("compression.type" -> "none") + } else { + Map[String, String]() + } +} class KafkaSystemFactory extends SystemFactory with Logging { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { @@ -66,7 +75,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { val clientId = KafkaUtil.getClientId("samza-producer", config) - val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) + val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) + val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) } val metrics = new KafkaSystemProducerMetrics(systemName, registry) http://git-wip-us.apache.org/repos/asf/samza/blob/8b52e8ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala index 5f65144..ce84b6d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig +import org.apache.samza.config.StorageConfig import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStream import org.junit.Assert._ @@ -83,4 +84,16 @@ class TestKafkaSystemFactory { assertNotNull(producer) assertTrue(producer.isInstanceOf[KafkaSystemProducer]) } + + @Test + def testInjectedProducerProps { + val configMap = Map[String, String]( + StorageConfig.FACTORY.format("system1") -> "some.factory.Class", + StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1", + StorageConfig.FACTORY.format("system2") -> "some.factory.Class") + val config = new MapConfig(configMap) + assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config)) + assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config)) + assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config)) + } }
