Repository: incubator-samza Updated Branches: refs/heads/master 8cd3ff12d -> c96ecc68b
SAMZA-44; change state topic phrasing in kafka checkpoint manager to be checkpoint topic Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c96ecc68 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c96ecc68 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c96ecc68 Branch: refs/heads/master Commit: c96ecc68bf67d919e8d42265acc70fd4b5ea0b51 Parents: 8cd3ff1 Author: Zhijie Shen <[email protected]> Authored: Mon Mar 24 10:26:17 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Mar 24 10:26:17 2014 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 66 ++++++++++---------- .../kafka/KafkaCheckpointManagerFactory.scala | 4 +- .../checkpoint/TestKafkaCheckpointManager.scala | 4 +- 3 files changed, 37 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 64e882b..e98b50a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -56,7 +56,7 @@ import org.apache.samza.util.ExponentialSleepStrategy */ class KafkaCheckpointManager( clientId: String, - stateTopic: String, + checkpointTopic: String, systemName: String, totalPartitions: Int, replicationFactor: Int, @@ -72,7 +72,7 @@ class KafkaCheckpointManager( var partitions = Set[Partition]() var producer: Producer[Partition, Array[Byte]] = null - info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, systemName=%s" format (clientId, stateTopic, systemName)) + info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format (clientId, checkpointTopic, systemName)) def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { retryBackoff.run( @@ -80,7 +80,7 @@ class KafkaCheckpointManager( if (producer == null) { producer = connectProducer() } - producer.send(new KeyedMessage(stateTopic, null, partition, serde.toBytes(checkpoint))) + producer.send(new KeyedMessage(checkpointTopic, null, partition, serde.toBytes(checkpoint))) loop.done }, @@ -100,10 +100,10 @@ class KafkaCheckpointManager( val checkpoint = retryBackoff.run( loop => { - // Assume state topic exists with correct partitions, since it should be verified on start. - // Fetch the metadata for this state topic/partition pair. - val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - val metadata = metadataMap(stateTopic) + // Assume checkpoint topic exists with correct partitions, since it should be verified on start. + // Fetch the metadata for this checkpoint topic/partition pair. + val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) + val metadata = metadataMap(checkpointTopic) val partitionMetadata = metadata.partitionsMetadata .filter(_.partitionId == partition.getPartitionId) .headOption @@ -111,9 +111,9 @@ class KafkaCheckpointManager( val partitionId = partitionMetadata.partitionId val leader = partitionMetadata .leader - .getOrElse(throw new SamzaException("No leader available for topic %s" format stateTopic)) + .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) - info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, stateTopic, partitionId)) + info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, checkpointTopic, partitionId)) val consumer = new SimpleConsumer( leader.host, @@ -122,11 +122,11 @@ class KafkaCheckpointManager( bufferSize, clientId) try { - val topicAndPartition = new TopicAndPartition(stateTopic, partitionId) + val topicAndPartition = new TopicAndPartition(checkpointTopic, partitionId) val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) .partitionErrorAndOffsets .get(topicAndPartition) - .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId))) + .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (checkpointTopic, partitionId))) // Fail or retry if there was an an issue with the offset request. ErrorMapping.maybeThrowException(offsetResponse.error) @@ -134,40 +134,40 @@ class KafkaCheckpointManager( val offset = offsetResponse .offsets .headOption - .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId))) + .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (checkpointTopic, partitionId))) - info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, stateTopic, partitionId)) + info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, checkpointTopic, partitionId)) if (offset <= 0) { - info("Got offset 0 (no messages in state topic) for topic %s and partition %s, so returning null. If you expected the state topic to have messages, you're probably going to lose data." format (stateTopic, partition)) + info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (checkpointTopic, partition)) return null } val request = new FetchRequestBuilder() // Kafka returns 1 greater than the offset of the last message in // the topic, so subtract one to fetch the last message. - .addFetch(stateTopic, partitionId, offset - 1, fetchSize) + .addFetch(checkpointTopic, partitionId, offset - 1, fetchSize) .maxWait(500) .minBytes(1) .clientId(clientId) .build val messageSet = consumer.fetch(request) if (messageSet.hasError) { - warn("Got error code from broker for %s: %s" format (stateTopic, messageSet.errorCode(stateTopic, partitionId))) - val errorCode = messageSet.errorCode(stateTopic, partitionId) + warn("Got error code from broker for %s: %s" format (checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId))) + val errorCode = messageSet.errorCode(checkpointTopic, partitionId) if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { - warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (stateTopic, partitionId)) + warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (checkpointTopic, partitionId)) return null } ErrorMapping.maybeThrowException(errorCode) } - val messages = messageSet.messageSet(stateTopic, partitionId).toList + val messages = messageSet.messageSet(checkpointTopic, partitionId).toList if (messages.length != 1) { throw new KafkaCheckpointException("Something really unexpected happened. Got %s " - + "messages back when fetching from state checkpoint topic %s and partition %s. " + + "messages back when fetching from checkpoint topic %s and partition %s. " + "Expected one message. It would be unsafe to go on without the latest checkpoint, " - + "so failing." format (messages.length, stateTopic, partition)) + + "so failing." format (messages.length, checkpointTopic, partition)) } // Some back bending to go from message to checkpoint. @@ -183,7 +183,7 @@ class KafkaCheckpointManager( exception match { case e: KafkaCheckpointException => throw e case e: Exception => - warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (stateTopic, partition, e)) + warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e)) debug(e) } } @@ -212,31 +212,31 @@ class KafkaCheckpointManager( } private def createTopic { - info("Attempting to create state topic %s with %s partitions." format (stateTopic, totalPartitions)) + info("Attempting to create checkpoint topic %s with %s partitions." format (checkpointTopic, totalPartitions)) retryBackoff.run( loop => { val zkClient = connectZk() try { AdminUtils.createTopic( zkClient, - stateTopic, + checkpointTopic, totalPartitions, replicationFactor) } finally { zkClient.close } - info("Created state topic %s." format stateTopic) + info("Created checkpoint topic %s." format checkpointTopic) loop.done }, (exception, loop) => { exception match { case e: TopicExistsException => - info("State topic %s already exists." format stateTopic) + info("Checkpoint topic %s already exists." format checkpointTopic) loop.done case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format (stateTopic, e)) + warn("Failed to create topic %s: %s. Retrying." format (checkpointTopic, e)) debug(e) } } @@ -244,19 +244,19 @@ class KafkaCheckpointManager( } private def validateTopic { - info("Validating state topic %s." format stateTopic) + info("Validating checkpoint topic %s." format checkpointTopic) retryBackoff.run( loop => { - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, metadataStore.getTopicInfo) - val topicMetadata = topicMetadataMap(stateTopic) + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo) + val topicMetadata = topicMetadataMap(checkpointTopic) ErrorMapping.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length if (partitionCount != totalPartitions) { - throw new KafkaCheckpointException("State topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (stateTopic, topicMetadata.partitionsMetadata.length, totalPartitions)) + throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, totalPartitions)) } - info("Successfully validated state topic %s." format stateTopic) + info("Successfully validated checkpoint topic %s." format checkpointTopic) loop.done }, @@ -264,7 +264,7 @@ class KafkaCheckpointManager( exception match { case e: KafkaCheckpointException => throw e case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format (stateTopic, e)) + warn("While trying to validate topic %s: %s. Retrying." format (checkpointTopic, e)) debug(e) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 2197b01..d45c1e4 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -72,14 +72,14 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val brokersListString = Option(producerConfig.brokerList) .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId) - val stateTopic = getTopic(jobName, jobId) + val checkpointTopic = getTopic(jobName, jobId) // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info. val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size new KafkaCheckpointManager( clientId, - stateTopic, + checkpointTopic, systemName, totalPartitions, replicationFactor, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/c96ecc68/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala index 3f5a609..f1a8f8a 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala @@ -104,7 +104,7 @@ class TestKafkaCheckpointManager { import TestKafkaCheckpointManager._ @Test - def testCheckpointShouldBeNullIfStateTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite { + def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite { val kcm = getKafkaCheckpointManager kcm.register(partition) kcm.start @@ -131,7 +131,7 @@ class TestKafkaCheckpointManager { private def getKafkaCheckpointManager = new KafkaCheckpointManager( clientId = "some-client-id", - stateTopic = "state-topic", + checkpointTopic = "checkpoint-topic", systemName = "kafka", totalPartitions = 1, replicationFactor = 3,
