Repository: samza Updated Branches: refs/heads/master 725a52603 -> f4bd84bbb
SAMZA-964 - Improve the performance of the continuous OFFSET checkpointing for logged stores Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f4bd84bb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f4bd84bb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f4bd84bb Branch: refs/heads/master Commit: f4bd84bbb7fe441355392d7ef0e920bb0b794f96 Parents: 725a526 Author: Jacob Maes <[email protected]> Authored: Tue Jun 14 16:42:52 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Tue Jun 14 16:42:52 2016 -0700 ---------------------------------------------------------------------- .../samza/system/ExtendedSystemAdmin.java | 5 +- .../stream/CoordinatorStreamSystemConsumer.java | 1 + .../samza/storage/TaskStorageManager.scala | 60 +++++---- .../samza/system/StreamMetadataCache.scala | 2 +- .../samza/coordinator/TestJobCoordinator.scala | 7 +- .../samza/storage/TestTaskStorageManager.scala | 38 ++++++ .../samza/system/kafka/KafkaSystemAdmin.scala | 102 +++++++++++++-- .../system/kafka/TestKafkaSystemAdmin.scala | 123 ++++++++++++------- .../samza/storage/kv/RocksDbKeyValueStore.scala | 3 +- .../apache/samza/storage/kv/CachedStore.scala | 5 +- .../apache/samza/storage/kv/LoggedStore.scala | 3 +- .../storage/kv/SerializedKeyValueStore.scala | 3 +- 12 files changed, 267 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java index daa2212..ac5b1aa 100644 --- a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java @@ -26,5 +26,8 @@ import java.util.Set; * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8 */ public interface ExtendedSystemAdmin extends SystemAdmin { - Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames); + Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL); + + // Makes fewer offset requests than getSystemStreamMetadata + String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries); } http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 0a6661c..c343865 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -91,6 +91,7 @@ public class CoordinatorStreamSystemConsumer { String streamName = coordinatorSystemStreamPartition.getStream(); streamNames.add(streamName); Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames); + log.info(String.format("Got metadata %s", systemStreamMetadataMap.toString())); if (systemStreamMetadataMap == null) { throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal."); http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index c7b0520..2a3535e 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -218,30 +218,48 @@ class TaskStorageManager( */ private def flushChangelogOffsetFiles() { debug("Persisting logged key value stores") - changeLogSystemStreams.foreach { case (store, systemStream) => { - val streamToMetadata = systemAdmins(systemStream.getSystem) - .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream))) - val sspMetadata = streamToMetadata - .get(systemStream.getStream) - .getSystemStreamPartitionMetadata - .get(partition) - val newestOffset = sspMetadata.getNewestOffset - - if (newestOffset != null) { - val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName) - - try { + + for ((storeName, systemStream) <- changeLogSystemStreams) { + val systemAdmin = systemAdmins + .getOrElse(systemStream.getSystem, + throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) + + debug("Fetching newest offset for store %s" format(storeName)) + try { + val newestOffset = if (systemAdmin.isInstanceOf[ExtendedSystemAdmin]) { + // This approach is much more efficient because it only fetches the newest offset for 1 SSP + // rather than newest and oldest offsets for all SSPs. Use it if we can. + systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new SystemStreamPartition(systemStream.getSystem, systemStream.getStream, partition), 3) + } else { + val streamToMetadata = systemAdmins(systemStream.getSystem) + .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream))) + val sspMetadata = streamToMetadata + .get(systemStream.getStream) + .getSystemStreamPartitionMetadata + .get(partition) + sspMetadata.getNewestOffset + } + debug("Got offset %s for store %s" format(newestOffset, storeName)) + + val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName), offsetFileName) + if (newestOffset != null) { + debug("Storing offset for store in OFFSET file ") Util.writeDataToFile(offsetFile, newestOffset) - debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, store)) - } catch { - case e: Exception => error("Exception storing offset %s for store %s" format(newestOffset, store), e) + debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName)) + } else { + //if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file + if (offsetFile.exists()) { + Util.rm(offsetFile) + } + debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId)) } + } catch { + case e: Exception => error("Exception storing offset for store %s. Skipping." format(storeName), e) } - else { - //if newestOffset is null, then it means the store is empty. No need to persist the offset file - debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, store, systemStream.getStream, partition.getPartitionId)) - } - }} + + } + + debug("Done persisting logged key value stores") } /** http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala index 18b47ec..918fa53 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala @@ -64,7 +64,7 @@ class StreamMetadataCache ( val systemAdmin = systemAdmins .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName)) val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) { - systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream)) + systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream), cacheTTLms) } else { systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream)) } http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index ffdb006..55a879b 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -231,7 +231,8 @@ class MockSystemAdmin extends ExtendedSystemAdmin { override def offsetComparator(offset1: String, offset2: String) = null - override def getSystemStreamPartitionCounts(streamNames: util.Set[String]): util.Map[String, SystemStreamMetadata] = { + override def getSystemStreamPartitionCounts(streamNames: util.Set[String], + cacheTTL: Long): util.Map[String, SystemStreamMetadata] = { assertEquals(1, streamNames.size()) val result = streamNames.map { stream => @@ -244,4 +245,8 @@ class MockSystemAdmin extends ExtendedSystemAdmin { }.toMap result } + + override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index c8ea64c..e126481 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -122,6 +122,10 @@ class TestTaskStorageManager extends MockitoSugar { assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) } + /** + * For instances of SystemAdmin, the store manager should call the slow getSystemStreamMetadata() method + * which gets offsets for ALL n partitions of the changelog, regardless of how many we need for the current task. + */ @Test def testFlushCreatesOffsetFileForLoggedStore() { val partition = new Partition(0) @@ -148,6 +152,40 @@ class TestTaskStorageManager extends MockitoSugar { assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) } + /** + * For instances of ExtendedSystemAdmin, the store manager should call the optimized getNewestOffset() method. + * Flush should also delete the existing OFFSET file if the changelog partition (for some reason) becomes empty + */ + @Test + def testFlushCreatesOffsetFileForLoggedStoreExtendedSystemAdmin() { + val partition = new Partition(0) + + val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET") + + val mockSystemAdmin = mock[ExtendedSystemAdmin] + when(mockSystemAdmin.getNewestOffset(any(classOf[SystemStreamPartition]), anyInt())).thenReturn("100").thenReturn(null) + + //Build TaskStorageManager + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(loggedStore) + .setSystemAdmin("kafka", mockSystemAdmin) + .setPartition(partition) + .build + + //Invoke test method + taskStorageManager.flush() + + //Check conditions + assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) + assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) + + //Invoke test method again + taskStorageManager.flush() + + //Check conditions + assertFalse("Offset file for null offset exists!", offsetFilePath.exists()) + } + @Test def testFlushOverwritesOffsetFileForLoggedStore() { val partition = new Partition(0) http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 23aa58d..ba8de5c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -32,7 +32,7 @@ import kafka.common.{ TopicExistsException, TopicAndPartition } import java.util.{ Properties, UUID } import scala.collection.JavaConversions import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata} import kafka.consumer.ConsumerConfig import kafka.admin.AdminUtils import org.apache.samza.util.KafkaUtil @@ -64,7 +64,8 @@ object KafkaSystemAdmin extends Logging { } .toMap - info("Got metadata: %s" format allMetadata) + // This is typically printed downstream and it can be spammy, so debug level here. + debug("Got metadata: %s" format allMetadata) allMetadata } @@ -72,6 +73,7 @@ object KafkaSystemAdmin extends Logging { /** * A helper class that is used to construct the changelog stream specific information + * * @param replicationFactor The number of replicas for the changelog stream * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation */ @@ -139,18 +141,20 @@ class KafkaSystemAdmin( import KafkaSystemAdmin._ - def getSystemStreamPartitionCounts(streams: util.Set[String]): util.Map[String, SystemStreamMetadata] = { - getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500)) + override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = { + getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL) } - def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy): util.Map[String, SystemStreamMetadata] = { + def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, SystemStreamMetadata] = { debug("Fetching system stream partition count for: %s" format streams) + var metadataTTL = cacheTTL retryBackoff.run( loop => { val metadata = TopicMetadataCache.getTopicMetadata( streams.toSet, systemName, - getTopicMetadata) + getTopicMetadata, + metadataTTL) val result = metadata.map { case (topic, topicMetadata) => { val partitionsMap = topicMetadata.partitionsMetadata.map { @@ -167,6 +171,9 @@ class KafkaSystemAdmin( (exception, loop) => { warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception)) debug("Exception detail:", exception) + if (metadataTTL == Long.MaxValue) { + metadataTTL = 5000 // Revert to the default cache expiration + } } ).getOrElse(throw new SamzaException("Failed to get system stream metadata")) } @@ -175,14 +182,14 @@ class KafkaSystemAdmin( * Returns the offset for the message after the specified offset for each * SystemStreamPartition that was passed in. */ - def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { + override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { // This is safe to do with Kafka, even if a topic is key-deduped. If the // offset doesn't exist on a compacted topic, Kafka will return the first // message AFTER the offset that was specified in the fetch request. offsets.mapValues(offset => (offset.toLong + 1).toString) } - def getSystemStreamMetadata(streams: java.util.Set[String]) = + override def getSystemStreamMetadata(streams: java.util.Set[String]) = getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)) /** @@ -194,17 +201,18 @@ class KafkaSystemAdmin( */ def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = { debug("Fetching system stream metadata for: %s" format streams) + var metadataTTL = Long.MaxValue // Trust the cache until we get an exception retryBackoff.run( loop => { val metadata = TopicMetadataCache.getTopicMetadata( streams.toSet, systemName, - getTopicMetadata) + getTopicMetadata, + metadataTTL) debug("Got metadata for streams: %s" format metadata) val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) - var partitions = Map[String, Set[Partition]]() var oldestOffsets = Map[SystemStreamPartition, String]() var newestOffsets = Map[SystemStreamPartition, String]() var upcomingOffsets = Map[SystemStreamPartition, String]() @@ -215,8 +223,9 @@ class KafkaSystemAdmin( val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) try { - oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime) upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime) + oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime) + // Kafka's "latest" offset is always last message in stream's offset + // 1, so get newest message in stream by subtracting one. this is safe // even for key-deduplicated streams, since the last message will @@ -251,10 +260,75 @@ class KafkaSystemAdmin( (exception, loop) => { warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception)) debug("Exception detail:", exception) + metadataTTL = 5000 // Revert to the default cache expiration }).getOrElse(throw new SamzaException("Failed to get system stream metadata")) } - def createCoordinatorStream(streamName: String) { + /** + * Returns the newest offset for the specified SSP. + * This method is fast and targeted. It minimizes the number of kafka requests. + * It does not retry indefinitely if there is any failure. + * It returns null if the topic is empty. To get the offsets for *all* + * partitions, it would be more efficient to call getSystemStreamMetadata + */ + override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = { + debug("Fetching newest offset for: %s" format ssp) + var offset: String = null + var metadataTTL = Long.MaxValue // Trust the cache until we get an exception + var retries = maxRetries + new ExponentialSleepStrategy().run( + loop => { + val metadata = TopicMetadataCache.getTopicMetadata( + Set(ssp.getStream), + systemName, + getTopicMetadata, + metadataTTL) + debug("Got metadata for streams: %s" format metadata) + + val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) + val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId) + val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1 + + // Get oldest, newest, and upcoming offsets for each topic and partition. + debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition)) + val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) + try { + offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2 + + // Kafka's "latest" offset is always last message in stream's offset + + // 1, so get newest message in stream by subtracting one. this is safe + // even for key-deduplicated streams, since the last message will + // never be deduplicated. + if (offset.toLong <= 0) { + debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) + offset = null + } else { + offset = (offset.toLong - 1).toString + } + } finally { + consumer.close + } + + debug("Got offset %s for %s." format(offset, ssp)) + loop.done + }, + + (exception, loop) => { + if (retries > 0) { + warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception)) + metadataTTL = 0L // Force metadata refresh + retries -= 1 + } else { + warn("Exception while trying to get offset for %s" format(ssp), exception) + loop.done + throw exception + } + }) + + offset + } + + override def createCoordinatorStream(streamName: String) { info("Attempting to create coordinator stream %s." format streamName) new ExponentialSleepStrategy(initialDelayMs = 500).run( loop => { @@ -395,10 +469,11 @@ class KafkaSystemAdmin( private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy info("Validating changelog topic %s." format topicName) + var metadataTTL = Long.MaxValue // Trust the cache until we get an exception retryBackoff.run( loop => { val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL) val topicMetadata = topicMetadataMap(topicName) KafkaUtil.maybeThrowException(topicMetadata.errorCode) @@ -417,6 +492,7 @@ class KafkaSystemAdmin( case e: Exception => warn("While trying to validate topic %s: %s. Retrying." format (topicName, e)) debug("Exception detail:", e) + metadataTTL = 5000L // Revert to the default value } }) } http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 6c29223..f00405d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -21,36 +21,31 @@ package org.apache.samza.system.kafka +import java.util import java.util.Properties + import kafka.admin.AdminUtils -import kafka.consumer.Consumer -import kafka.consumer.ConsumerConfig -import kafka.server.KafkaConfig -import kafka.server.KafkaServer -import kafka.utils.TestUtils -import kafka.utils.TestZKUtils -import kafka.utils.Utils -import kafka.utils.ZKStringSerializer +import kafka.common.{ErrorMapping, LeaderNotAvailableException} +import kafka.consumer.{Consumer, ConsumerConfig} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer} import kafka.zk.EmbeddedZookeeper import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.samza.Partition -import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.config.KafkaProducerConfig import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.ClientUtilTopicMetadataStore -import org.apache.samza.util.TopicMetadataStore +import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore} import org.junit.Assert._ -import org.junit.{Test, BeforeClass, AfterClass} +import org.junit._ + import scala.collection.JavaConversions._ -import org.apache.samza.config.KafkaProducerConfig -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} -import java.util -import kafka.common.ErrorMapping -import org.apache.samza.util.KafkaUtil object TestKafkaSystemAdmin { + val SYSTEM = "kafka" val TOPIC = "input" + val TOPIC2 = "input2" val TOTAL_PARTITIONS = 50 val REPLICATION_FACTOR = 2 @@ -71,7 +66,7 @@ object TestKafkaSystemAdmin { val config = new util.HashMap[String, Object]() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) config.put("bootstrap.servers", brokers) - config.put("request.required.acks", "-1") + config.put("acks", "all") config.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig = new KafkaProducerConfig("kafka", "i001", config) var producer: KafkaProducer[Array[Byte], Array[Byte]] = null @@ -92,11 +87,11 @@ object TestKafkaSystemAdmin { metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") } - def createTopic { + def createTopic(topicName: String, partitionCount: Int) { AdminUtils.createTopic( zkClient, - TOPIC, - TOTAL_PARTITIONS, + topicName, + partitionCount, REPLICATION_FACTOR) } @@ -107,7 +102,7 @@ object TestKafkaSystemAdmin { while (!done && retries < maxRetries) { try { - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo) val topicMetadata = topicMetadataMap(topic) val errorCode = topicMetadata.errorCode @@ -162,26 +157,26 @@ object TestKafkaSystemAdmin { class TestKafkaSystemAdmin { import TestKafkaSystemAdmin._ - val systemName = "test" // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated - val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + @Test def testShouldAssembleMetadata { val oldestOffsets = Map( - new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1", - new SystemStreamPartition("test", "stream2", new Partition(0)) -> "o2", - new SystemStreamPartition("test", "stream1", new Partition(1)) -> "o3", - new SystemStreamPartition("test", "stream2", new Partition(1)) -> "o4") + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "o1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "o2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "o3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "o4") val newestOffsets = Map( - new SystemStreamPartition("test", "stream1", new Partition(0)) -> "n1", - new SystemStreamPartition("test", "stream2", new Partition(0)) -> "n2", - new SystemStreamPartition("test", "stream1", new Partition(1)) -> "n3", - new SystemStreamPartition("test", "stream2", new Partition(1)) -> "n4") + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "n1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "n2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "n3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "n4") val upcomingOffsets = Map( - new SystemStreamPartition("test", "stream1", new Partition(0)) -> "u1", - new SystemStreamPartition("test", "stream2", new Partition(0)) -> "u2", - new SystemStreamPartition("test", "stream1", new Partition(1)) -> "u3", - new SystemStreamPartition("test", "stream2", new Partition(1)) -> "u4") + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "u1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4") val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) assertNotNull(metadata) assertEquals(2, metadata.size) @@ -208,7 +203,7 @@ class TestKafkaSystemAdmin { @Test def testShouldGetOldestNewestAndNextOffsets { // Create an empty topic with 50 partitions, but with no offsets. - createTopic + createTopic(TOPIC, 50) validateTopic(TOPIC, 50) // Verify the empty topic behaves as expected. @@ -290,7 +285,7 @@ class TestKafkaSystemAdmin { @Test def testShouldCreateCoordinatorStream { val topic = "test-coordinator-stream" - val systemAdmin = new KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3) + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3) systemAdmin.createCoordinatorStream(topic) validateTopic(topic, 1) val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) @@ -301,11 +296,12 @@ class TestKafkaSystemAdmin { assertEquals(3, partitionMetadata.replicas.size) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) { - import kafka.api.{ TopicMetadata, TopicMetadataResponse } - + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) { + import kafka.api.TopicMetadata + var metadataCallCount = 0 // Simulate Kafka telling us that the leader for the topic is not available override def getTopicMetadata(topics: Set[String]) = { + metadataCallCount += 1 val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode) Map("quux" -> topicMetadata) } @@ -322,4 +318,45 @@ class TestKafkaSystemAdmin { case e: ExponentialSleepStrategy.CallLimitReached => () } } + + @Test + def testGetNewestOffset { + createTopic(TOPIC2, 16) + validateTopic(TOPIC2, 16) + + val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4)) + val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13)) + + assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Add a new message to one of the partitions, and verify that it works as expected. + assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString) + assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Again + assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString) + assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Add a message to both partitions + assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString) + assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString) + assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0)) + assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0)) + } + + @Test (expected = classOf[LeaderNotAvailableException]) + def testGetNewestOffsetMaxRetry { + val expectedRetryCount = 3 + val systemAdmin = new KafkaSystemAdminWithTopicMetadataError + try { + systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3) + } catch { + case e: Exception => + assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount) + throw e + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index f0965ae..38c8fa0 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -196,8 +196,9 @@ class RocksDbKeyValueStore( def flush { metrics.flushes.inc - trace("Flushing.") + trace("Flushing store: %s" format storeName) db.flush(flushOptions) + trace("Flushed store: %s" format storeName) } def close() { http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index ae6717d..e7e4ede 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -185,11 +185,12 @@ class CachedStore[K, V]( } override def flush() { - trace("Flushing.") - + trace("Purging dirty entries from CachedStore.") metrics.flushes.inc putAllDirtyEntries() + trace("Flushing store.") store.flush() + trace("Flushed store.") } private def putAllDirtyEntries() { http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index 7bba6ff..dc5cbcd 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -100,11 +100,12 @@ class LoggedStore[K, V]( } def flush { - trace("Flushing.") + trace("Flushing store.") metrics.flushes.inc store.flush + trace("Flushed store.") } def close { http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 8e183ef..d77d476 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -110,11 +110,12 @@ class SerializedKeyValueStore[K, V]( } def flush { - trace("Flushing.") + trace("Flushing store.") metrics.flushes.inc store.flush + trace("Flushed store.") } def close {
