Repository: samza Updated Branches: refs/heads/master 7f1ec6492 -> b989e51ae
SAMZA-1442: use systemAdmin.validateStream in KafkaCheckpointManager Author: Boris S <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #314 from sborya/CheckpointTopicValidation Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b989e51a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b989e51a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b989e51a Branch: refs/heads/master Commit: b989e51ae866fee9cdba76a457992afc70695fdb Parents: 7f1ec64 Author: Boris S <[email protected]> Authored: Thu Oct 5 15:36:37 2017 -0700 Committer: Boris S <[email protected]> Committed: Thu Oct 5 15:36:37 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/StreamSpec.java | 4 + .../kafka/KafkaCheckpointManager.scala | 23 ++++- .../scala/org/apache/samza/util/KafkaUtil.scala | 88 -------------------- .../kafka/TestKafkaCheckpointManager.scala | 25 ++++-- 4 files changed, 45 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index 6ea1a22..523ff68 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -81,6 +81,10 @@ public class StreamSpec { */ private final Map<String, String> config; + @Override + public String toString() { + return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount); + } /** * @param id The application-unique logical identifier for the stream. It is used to distinguish between * streams in a Samza application so it must be unique in the context of one deployable unit. http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 1e22763..4eb6666 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -29,6 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec} import org.apache.samza.system.{StreamSpec, SystemAdmin, _} import org.apache.samza.util._ import org.apache.samza.{Partition, SamzaException} @@ -252,8 +253,26 @@ class KafkaCheckpointManager( } override def start { - kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties) - kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation) + val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id" + val spec = new KafkaStreamSpec(CHECKPOINT_STREAMID, + checkpointTopic, systemName, 1, + replicationFactor, checkpointTopicProperties) + + info("About to create checkpoint stream: " + spec) + systemAdmin.createStream(spec) + info("Created checkpoint stream: " + spec) + try { + systemAdmin.validateStream(spec) // SPECIAL VALIDATION FOR CHECKPOINT. DO NOT FAIL IF failOnCheckpointValidation IS FALSE + info("Validated spec: " + spec) + } catch { + case e : StreamValidationException => + if (failOnCheckpointValidation) { + throw e + } else { + warn("Checkpoint stream validation partially failed. Ignoring it because failOnCheckpointValidation=" + failOnCheckpointValidation) + } + case e1 : Exception => throw e1 + } } override def register(taskName: TaskName) { http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 39edba7..1410cbb 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -91,94 +91,6 @@ object KafkaUtil extends Logging { class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, val connectZk: () => ZkUtils) extends Logging { - /** - * Common code for creating a topic in Kafka - * - * @param topicName Name of the topic to be created - * @param partitionCount Number of partitions in the topic - * @param replicationFactor Number of replicas for the topic - * @param topicProperties Any topic related properties - */ - def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) { - info("Attempting to create topic %s." format topicName) - retryBackoff.run( - loop => { - val zkClient = connectZk() - try { - AdminUtils.createTopic( - zkClient, - topicName, - partitionCount, - replicationFactor, - topicProperties) - } finally { - zkClient.close - } - - info("Created topic %s." format topicName) - loop.done - }, - - (exception, loop) => { - exception match { - case tee: TopicExistsException => - info("Topic %s already exists." format topicName) - loop.done - case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format(topicName, e)) - debug("Exception detail:", e) - } - } - ) - } - - /** - * Common code to validate partition count in a topic - * - * @param topicName Name of the topic to be validated - * @param systemName Kafka system to use - * @param metadataStore Topic Metadata store - * @param expectedPartitionCount Expected number of partitions - * @param failOnValidation If true - fail the job if the validation fails - */ - def validateTopicPartitionCount(topicName: String, - systemName: String, - metadataStore: TopicMetadataStore, - expectedPartitionCount: Int, - failOnValidation: Boolean = true) { - info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount)) - retryBackoff.run( - loop => { - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) - val topicMetadata = topicMetadataMap(topicName) - KafkaUtil.maybeThrowException(topicMetadata.errorCode) - - val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != expectedPartitionCount) - { - val msg = "Validation failed for topic %s because partition count %s did not " + - "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount) - if (failOnValidation) { - throw new KafkaUtilException(msg) - } else { - warn(msg + " Ignoring the failure.") - } - } - - info("Successfully validated topic %s." format topicName) - loop.done - }, - - (exception, loop) => { - exception match { - case e: KafkaUtilException => throw e - case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format(topicName, e)) - debug("Exception detail:", e) - } - } - ) - } /** * Code to verify that a topic exists http://git-wip-us.apache.org/repos/asf/samza/blob/b989e51a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 43b912d..3337a36 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -19,6 +19,9 @@ package org.apache.samza.checkpoint.kafka + +import java.util.Properties + import _root_.kafka.admin.AdminUtils import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException} import _root_.kafka.integration.KafkaServerTestHarness @@ -33,6 +36,7 @@ import org.apache.samza.container.TaskName import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system._ +import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec} import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore} import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ @@ -71,6 +75,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { var systemConsumerFn: ()=>SystemConsumer = ()=>{null} var systemProducerFn: ()=>SystemProducer = ()=>{null} var systemAdminFn: ()=>SystemAdmin = ()=>{null} + + val systemName = "kafka" + val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id" + val kafkaStreamSpec = new KafkaStreamSpec(CHECKPOINT_STREAMID, + checkpointTopic, systemName, 1, + 1, new Properties()) @Before override def setUp { @@ -78,7 +88,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update") - val systemName = "kafka" val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") val config = new java.util.HashMap[String, String]() config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) @@ -102,6 +111,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())} systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())} systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)} + } @After @@ -153,7 +163,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val taskName = new TaskName(partition.toString) kcm.register(taskName) createCheckpointTopic() - kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1) + val systemAdmin = systemAdminFn() + systemAdmin.validateStream(kafkaStreamSpec) // check that log compaction is enabled. val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure) @@ -194,7 +205,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val taskName = new TaskName(partition.toString) kcm.register(taskName) createCheckpointTopic() - kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1) + val systemAdmin = systemAdminFn() + systemAdmin.validateStream(kafkaStreamSpec) + // check that log compaction is enabled. val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure) @@ -234,7 +247,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val taskName = new TaskName(partition.toString) kcm.register(taskName) createCheckpointTopic(serdeCheckpointTopic) - kcm.kafkaUtil.validateTopicPartitionCount(serdeCheckpointTopic, "kafka", metadataStore, 1) + val systemAdmin = systemAdminFn() + systemAdmin.validateStream(kafkaStreamSpec) + writeCheckpoint(taskName, cp1, serdeCheckpointTopic) // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException try { @@ -261,7 +276,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { kcm.start fail("Expected a KafkaUtilException for invalid number of partitions in the topic.") }catch { - case e: KafkaUtilException => None + case e: StreamValidationException => None } kcm.stop
