Repository: incubator-samza Updated Branches: refs/heads/master cdd2ba7c0 -> d2bf10eb5
SAMZA-399; make task.checkpoint.segment.bytes configurable Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d2bf10eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d2bf10eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d2bf10eb Branch: refs/heads/master Commit: d2bf10eb585df1fa566a2e5314dcdf485561e19f Parents: cdd2ba7 Author: Chris Riccomini <[email protected]> Authored: Tue Aug 26 13:43:06 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Aug 26 13:43:06 2014 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 10 ++++++++++ .../kafka/KafkaCheckpointManagerFactory.scala | 14 ++++++++++---- .../scala/org/apache/samza/config/KafkaConfig.scala | 2 ++ .../checkpoint/kafka/TestKafkaCheckpointManager.scala | 6 ++++-- 4 files changed, 26 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 0c74167..526ca9f 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -752,6 +752,16 @@ </tr> <tr> + <td class="property" id="task-checkpoint-segment-bytes">task.checkpoint.<br>segment.bytes</td> + <td class="default">26214400</td> + <td class="description"> + If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint + topic's log segments. Keeping this number small is useful because it increases the frequency + that Kafka will garbage collect old checkpoints. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="regex-rewriter"> Consuming all Kafka topics matching a regular expression<br> <span class="subtitle"> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 7ab50a3..f7db2a1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -53,9 +53,15 @@ object KafkaCheckpointManagerFactory { // enable log compaction. This keeps job startup time small since there // are fewer useless (overwritten) messages to read from the checkpoint // topic. - val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map( - "cleanup.policy" -> "compact", - "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props } + def getCheckpointTopicProperties(config: Config) = { + val segmentBytes = config + .getCheckpointSegmentBytes + .getOrElse("26214400") + + (new Properties /: Map( + "cleanup.policy" -> "compact", + "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } + } } class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { @@ -106,7 +112,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin connectProducer, connectZk, systemStreamPartitionGrouperFactoryString, - checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES) + checkpointTopicProperties = getCheckpointTopicProperties(config)) } private def getTopic(jobName: String, jobId: String) = http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index bdb416d..9fc1f56 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -32,6 +32,7 @@ object KafkaConfig { val CHECKPOINT_SYSTEM = "task.checkpoint.system" val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" + val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" /** * Defines how low a queue can get for a single system/stream/partition @@ -46,6 +47,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // checkpoints def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM) def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR) + def getCheckpointSegmentBytes() = getOption(KafkaConfig.CHECKPOINT_SEGMENT_BYTES) // custom consumer config def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d2bf10eb/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index f556479..4827731 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -44,6 +44,8 @@ import scala.collection.JavaConversions._ import scala.collection._ import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import kafka.admin.AdminUtils +import org.apache.samza.config.MapConfig +import scala.collection.JavaConversions._ object TestKafkaCheckpointManager { val checkpointTopic = "checkpoint-topic" @@ -177,7 +179,7 @@ class TestKafkaCheckpointManager { connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, - checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES) + checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager( @@ -193,7 +195,7 @@ class TestKafkaCheckpointManager { connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, serde = new InvalideSerde(exception), - checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES) + checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) class InvalideSerde(exception: String) extends CheckpointSerde { override def fromBytes(bytes: Array[Byte]): Checkpoint = {
