Repository: incubator-samza Updated Branches: refs/heads/0.7.0 6e981b43c -> a3a368893
SAMZA-296: Configuration cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a3a36889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a3a36889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a3a36889 Branch: refs/heads/0.7.0 Commit: a3a3688936fdc700a9a8b8728f913aa1c63ec803 Parents: 6e981b4 Author: Martin Kleppmann <[email protected]> Authored: Wed Jun 18 10:07:57 2014 -0700 Committer: Martin Kleppmann <[email protected]> Committed: Wed Jun 18 11:47:28 2014 -0700 ---------------------------------------------------------------------- .../0.7.0/yarn/application-master.md | 2 +- .../org/apache/samza/config/SystemConfig.scala | 2 +- .../org/apache/samza/config/TaskConfig.scala | 3 -- .../org/apache/samza/config/KafkaConfig.scala | 6 --- samza-test/src/main/resources/common.properties | 42 -------------------- .../org/apache/samza/config/YarnConfig.scala | 34 ++++------------ .../yarn/TestSamzaAppMasterTaskManager.scala | 2 +- 7 files changed, 11 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/docs/learn/documentation/0.7.0/yarn/application-master.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/0.7.0/yarn/application-master.md b/docs/learn/documentation/0.7.0/yarn/application-master.md index 30b14cd..6b81805 100644 --- a/docs/learn/documentation/0.7.0/yarn/application-master.md +++ b/docs/learn/documentation/0.7.0/yarn/application-master.md @@ -41,7 +41,7 @@ From this point on, the ApplicationMaster just reacts to events from the RM. ### Fault Tolerance -Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.countainer.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details. +Whenever a container is allocated, the AM will work with the YARN NM to start a SamzaContainer (with appropriate partitions assigned to it) in the container. If a container fails with a non-zero return code, the AM will request a new container, and restart the SamzaContainer. If a SamzaContainer fails too many times, too quickly, the ApplicationMaster will fail the whole Samza job with a non-zero return code. See the yarn.container.retry.count and yarn.container.retry.window.ms [configuration](../jobs/configuration.html) parameters for details. When the AM receives a reboot signal from YARN, it will throw a SamzaException. This will trigger a clean and successful shutdown of the AM (YARN won't think the AM failed). http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala index 5bb17c7..4cfdcc2 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala @@ -49,7 +49,7 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { */ def getSystemNames() = { val subConf = config.subset("systems.", true) - // find all .samza.partition.manager keys, and strip the suffix + // find all .samza.factory keys, and strip the suffix subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", "")) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 3510f1f..18a9510 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -32,7 +32,6 @@ object TaskConfig { val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to use when sending offset checkpoints - val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this task? val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class" implicit def Config2Task(config: Config) = new TaskConfig(config) @@ -70,7 +69,5 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) { def getCheckpointManagerFactory() = getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY) - def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true) - def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 4deabd3..91e8e49 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -33,9 +33,6 @@ object KafkaConfig { val CHECKPOINT_SYSTEM = "task.checkpoint.system" val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" - val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class" - val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class" - /** * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. @@ -51,8 +48,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR) // custom consumer config - def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name) - def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name) def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) /** @@ -61,7 +56,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { */ def getAutoOffsetResetTopics(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) - // find all .samza.partition.manager keys, and strip the suffix subConf .filterKeys(k => k.endsWith(".consumer.auto.offset.reset")) .map { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-test/src/main/resources/common.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/common.properties b/samza-test/src/main/resources/common.properties deleted file mode 100644 index 6e0c061..0000000 --- a/samza-test/src/main/resources/common.properties +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -job.factory.class=samza.job.local.LocalJobFactory - -task.checkpoint.factory=samza.checkpoint.kafka.KafkaCheckpointManagerFactory -task.checkpoint.system=kafka-checkpoints -task.checkpoint.replication.factor=1 - -serializers.registry.string.class=samza.serializers.StringSerdeFactory - -# Kafka System -systems.kafka.samza.factory=samza.system.kafka.KafkaSystemFactory -systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager -systems.kafka.consumer.zookeeper.connect=localhost:2181 -systems.kafka.consumer.auto.offset.reset=smallest -systems.kafka.producer.metadata.broker.list=localhost:9092 -systems.kafka.samza.msg.serde=string - -# Checkpoints System -systems.kafka-checkpoints.samza.factory=samza.system.kafka.KafkaSystemFactory -systems.kafka-checkpoints.serializer.class=samza.task.state.KafkaCheckpointEncoder -systems.kafka-checkpoints.partitioner.class=samza.task.state.KafkaCheckpointPartitioner -systems.kafka-checkpoints.key.serializer.class=kafka.serializer.NullEncoder -systems.kafka-checkpoints.producer.metadata.broker.list=localhost:9092 -systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181 -systems.kafka-checkpoints.producer.type=sync - http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala index 6c3aa92..e7868a6 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala @@ -24,7 +24,7 @@ object YarnConfig { val PACKAGE_PATH = "yarn.package.path" val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb" val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores" - val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count" + val CONTAINER_RETRY_COUNT = "yarn.container.retry.count" val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms" val TASK_COUNT = "yarn.container.count" val AM_JVM_OPTIONS = "yarn.am.opts" @@ -35,39 +35,21 @@ object YarnConfig { } class YarnConfig(config: Config) extends ScalaMapConfig(config) { - def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match { - case Some(mem) => Some(mem.toInt) - case _ => None - } + def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB).map(_.toInt) - def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match { - case Some(cpu) => Some(cpu.toInt) - case _ => None - } + def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES).map(_.toInt) - def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT) match { - case Some(count) => Some(count.toInt) - case _ => None - } + def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT).map(_.toInt) - def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match { - case Some(retryWindowMs) => Some(retryWindowMs.toInt) - case _ => None - } + def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS).map(_.toInt) def getPackagePath = getOption(YarnConfig.PACKAGE_PATH) - def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match { - case Some(tc) => Some(tc.toInt) - case _ => None - } + def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt) def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS) - - def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match { - case Some(mem) => Some(mem.toInt) - case _ => None - } + + def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt) def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 9d832ae..4cf2d49 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -139,7 +139,7 @@ class TestSamzaAppMasterTaskManager { "task.inputs" -> "test-system.test-stream", "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde", "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde", - "yarn.countainer.retry.count" -> "1", + "yarn.container.retry.count" -> "1", "yarn.container.retry.window.ms" -> "1999999999")) @Test
