Repository: samza
Updated Branches:
  refs/heads/master 725a52603 -> f4bd84bbb


SAMZA-964 - Improve the performance of the continuous OFFSET checkpointing for 
logged stores


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f4bd84bb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f4bd84bb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f4bd84bb

Branch: refs/heads/master
Commit: f4bd84bbb7fe441355392d7ef0e920bb0b794f96
Parents: 725a526
Author: Jacob Maes <[email protected]>
Authored: Tue Jun 14 16:42:52 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Tue Jun 14 16:42:52 2016 -0700

----------------------------------------------------------------------
 .../samza/system/ExtendedSystemAdmin.java       |   5 +-
 .../stream/CoordinatorStreamSystemConsumer.java |   1 +
 .../samza/storage/TaskStorageManager.scala      |  60 +++++----
 .../samza/system/StreamMetadataCache.scala      |   2 +-
 .../samza/coordinator/TestJobCoordinator.scala  |   7 +-
 .../samza/storage/TestTaskStorageManager.scala  |  38 ++++++
 .../samza/system/kafka/KafkaSystemAdmin.scala   | 102 +++++++++++++--
 .../system/kafka/TestKafkaSystemAdmin.scala     | 123 ++++++++++++-------
 .../samza/storage/kv/RocksDbKeyValueStore.scala |   3 +-
 .../apache/samza/storage/kv/CachedStore.scala   |   5 +-
 .../apache/samza/storage/kv/LoggedStore.scala   |   3 +-
 .../storage/kv/SerializedKeyValueStore.scala    |   3 +-
 12 files changed, 267 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java 
b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
index daa2212..ac5b1aa 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
@@ -26,5 +26,8 @@ import java.util.Set;
  * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 
1.8
  */
 public interface ExtendedSystemAdmin extends SystemAdmin {
-  Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> 
streamNames);
+  Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> 
streamNames, long cacheTTL);
+
+  // Makes fewer offset requests than getSystemStreamMetadata
+  String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 0a6661c..c343865 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -91,6 +91,7 @@ public class CoordinatorStreamSystemConsumer {
     String streamName = coordinatorSystemStreamPartition.getStream();
     streamNames.add(streamName);
     Map<String, SystemStreamMetadata> systemStreamMetadataMap = 
systemAdmin.getSystemStreamMetadata(streamNames);
+    log.info(String.format("Got metadata %s", 
systemStreamMetadataMap.toString()));
 
     if (systemStreamMetadataMap == null) {
       throw new SamzaException("Received a null systemStreamMetadataMap from 
the systemAdmin. This is illegal.");

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c7b0520..2a3535e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -218,30 +218,48 @@ class TaskStorageManager(
     */
   private def flushChangelogOffsetFiles() {
     debug("Persisting logged key value stores")
-    changeLogSystemStreams.foreach { case (store, systemStream) => {
-      val streamToMetadata = systemAdmins(systemStream.getSystem)
-              
.getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
-      val sspMetadata = streamToMetadata
-              .get(systemStream.getStream)
-              .getSystemStreamPartitionMetadata
-              .get(partition)
-      val newestOffset = sspMetadata.getNewestOffset
-
-      if (newestOffset != null) {
-        val offsetFile = new 
File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, 
taskName), offsetFileName)
-
-        try {
+
+    for ((storeName, systemStream) <- changeLogSystemStreams) {
+      val systemAdmin = systemAdmins
+              .getOrElse(systemStream.getSystem,
+                         throw new SamzaException("Unable to get systemAdmin 
for store " + storeName + " and systemStream" + systemStream))
+
+      debug("Fetching newest offset for store %s" format(storeName))
+      try {
+        val newestOffset = if (systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
+          // This approach is much more efficient because it only fetches the 
newest offset for 1 SSP
+          // rather than newest and oldest offsets for all SSPs. Use it if we 
can.
+          systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new 
SystemStreamPartition(systemStream.getSystem, systemStream.getStream, 
partition), 3)
+        } else {
+          val streamToMetadata = systemAdmins(systemStream.getSystem)
+                  
.getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
+          val sspMetadata = streamToMetadata
+                  .get(systemStream.getStream)
+                  .getSystemStreamPartitionMetadata
+                  .get(partition)
+          sspMetadata.getNewestOffset
+        }
+        debug("Got offset %s for store %s" format(newestOffset, storeName))
+
+        val offsetFile = new 
File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, 
taskName), offsetFileName)
+        if (newestOffset != null) {
+          debug("Storing offset for store in OFFSET file ")
           Util.writeDataToFile(offsetFile, newestOffset)
-          debug("Successfully stored offset %s for store %s in OFFSET file " 
format(newestOffset, store))
-        } catch {
-          case e: Exception => error("Exception storing offset %s for store 
%s" format(newestOffset, store), e)
+          debug("Successfully stored offset %s for store %s in OFFSET file " 
format(newestOffset, storeName))
+        } else {
+          //if newestOffset is null, then it means the store is (or has 
become) empty. No need to persist the offset file
+          if (offsetFile.exists()) {
+            Util.rm(offsetFile)
+          }
+          debug("Not storing OFFSET file for taskName %s. Store %s backed by 
changelog topic : %s, partition: %s is empty. " format (taskName, storeName, 
systemStream.getStream, partition.getPartitionId))
         }
+      } catch {
+        case e: Exception => error("Exception storing offset for store %s. 
Skipping." format(storeName), e)
       }
-      else {
-        //if newestOffset is null, then it means the store is empty. No need 
to persist the offset file
-        debug("Not storing OFFSET file for taskName %s. Store %s backed by 
changelog topic : %s, partition: %s is empty. " format (taskName, store, 
systemStream.getStream, partition.getPartitionId))
-      }
-    }}
+
+    }
+
+    debug("Done persisting logged key value stores")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 18b47ec..918fa53 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -64,7 +64,7 @@ class StreamMetadataCache (
           val systemAdmin = systemAdmins
             .getOrElse(systemName, throw new SamzaException("Cannot get 
metadata for unknown system: %s" format systemName))
           val streamToMetadata = if (partitionsMetadataOnly && 
systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
-            
systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream))
+            
systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream),
 cacheTTLms)
           } else {
             systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream))
           }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index ffdb006..55a879b 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -231,7 +231,8 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
   
   override def offsetComparator(offset1: String, offset2: String) = null
 
-  override def getSystemStreamPartitionCounts(streamNames: util.Set[String]): 
util.Map[String, SystemStreamMetadata] = {
+  override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
+                                              cacheTTL: Long): 
util.Map[String, SystemStreamMetadata] = {
     assertEquals(1, streamNames.size())
     val result = streamNames.map {
       stream =>
@@ -244,4 +245,8 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
     }.toMap
     result
   }
+
+  override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: 
Integer) = null
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index c8ea64c..e126481 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -122,6 +122,10 @@ class TestTaskStorageManager extends MockitoSugar {
     assertEquals("Found incorrect value in offset file!", "100", 
Util.readDataFromFile(offsetFilePath))
   }
 
+  /**
+    * For instances of SystemAdmin, the store manager should call the slow 
getSystemStreamMetadata() method
+    * which gets offsets for ALL n partitions of the changelog, regardless of 
how many we need for the current task.
+    */
   @Test
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
@@ -148,6 +152,40 @@ class TestTaskStorageManager extends MockitoSugar {
     assertEquals("Found incorrect value in offset file!", "100", 
Util.readDataFromFile(offsetFilePath))
   }
 
+  /**
+    * For instances of ExtendedSystemAdmin, the store manager should call the 
optimized getNewestOffset() method.
+    * Flush should also delete the existing OFFSET file if the changelog 
partition (for some reason) becomes empty
+    */
+  @Test
+  def testFlushCreatesOffsetFileForLoggedStoreExtendedSystemAdmin() {
+    val partition = new Partition(0)
+
+    val offsetFilePath = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + "OFFSET")
+
+    val mockSystemAdmin = mock[ExtendedSystemAdmin]
+    when(mockSystemAdmin.getNewestOffset(any(classOf[SystemStreamPartition]), 
anyInt())).thenReturn("100").thenReturn(null)
+
+    //Build TaskStorageManager
+    val taskStorageManager = new TaskStorageManagerBuilder()
+            .addStore(loggedStore)
+            .setSystemAdmin("kafka", mockSystemAdmin)
+            .setPartition(partition)
+            .build
+
+    //Invoke test method
+    taskStorageManager.flush()
+
+    //Check conditions
+    assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+    assertEquals("Found incorrect value in offset file!", "100", 
Util.readDataFromFile(offsetFilePath))
+
+    //Invoke test method again
+    taskStorageManager.flush()
+
+    //Check conditions
+    assertFalse("Offset file for null offset exists!", offsetFilePath.exists())
+  }
+
   @Test
   def testFlushOverwritesOffsetFileForLoggedStore() {
     val partition = new Partition(0)

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 23aa58d..ba8de5c 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -32,7 +32,7 @@ import kafka.common.{ TopicExistsException, TopicAndPartition 
}
 import java.util.{ Properties, UUID }
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
 import kafka.consumer.ConsumerConfig
 import kafka.admin.AdminUtils
 import org.apache.samza.util.KafkaUtil
@@ -64,7 +64,8 @@ object KafkaSystemAdmin extends Logging {
       }
       .toMap
 
-    info("Got metadata: %s" format allMetadata)
+    // This is typically printed downstream and it can be spammy, so debug 
level here.
+    debug("Got metadata: %s" format allMetadata)
 
     allMetadata
   }
@@ -72,6 +73,7 @@ object KafkaSystemAdmin extends Logging {
 
 /**
  * A helper class that is used to construct the changelog stream specific 
information
+ *
  * @param replicationFactor The number of replicas for the changelog stream
  * @param kafkaProps The kafka specific properties that need to be used for 
changelog stream creation
  */
@@ -139,18 +141,20 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
-  def getSystemStreamPartitionCounts(streams: util.Set[String]): 
util.Map[String, SystemStreamMetadata] = {
-    getSystemStreamPartitionCounts(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500))
+  override def getSystemStreamPartitionCounts(streams: util.Set[String], 
cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
+    getSystemStreamPartitionCounts(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
   }
 
-  def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: 
ExponentialSleepStrategy): util.Map[String, SystemStreamMetadata] = {
+  def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: 
ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, 
SystemStreamMetadata] = {
     debug("Fetching system stream partition count for: %s" format streams)
+    var metadataTTL = cacheTTL
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
           streams.toSet,
           systemName,
-        getTopicMetadata)
+          getTopicMetadata,
+          metadataTTL)
         val result = metadata.map {
           case (topic, topicMetadata) => {
             val partitionsMap = topicMetadata.partitionsMetadata.map {
@@ -167,6 +171,9 @@ class KafkaSystemAdmin(
       (exception, loop) => {
         warn("Unable to fetch last offsets for streams %s due to %s. 
Retrying." format (streams, exception))
         debug("Exception detail:", exception)
+        if (metadataTTL == Long.MaxValue) {
+          metadataTTL = 5000 // Revert to the default cache expiration
+        }
       }
     ).getOrElse(throw new SamzaException("Failed to get system stream 
metadata"))
   }
@@ -175,14 +182,14 @@ class KafkaSystemAdmin(
    * Returns the offset for the message after the specified offset for each
    * SystemStreamPartition that was passed in.
    */
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
{
+  override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, 
String]) = {
     // This is safe to do with Kafka, even if a topic is key-deduped. If the 
     // offset doesn't exist on a compacted topic, Kafka will return the first 
     // message AFTER the offset that was specified in the fetch request.
     offsets.mapValues(offset => (offset.toLong + 1).toString)
   }
 
-  def getSystemStreamMetadata(streams: java.util.Set[String]) =
+  override def getSystemStreamMetadata(streams: java.util.Set[String]) =
     getSystemStreamMetadata(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500))
 
   /**
@@ -194,17 +201,18 @@ class KafkaSystemAdmin(
    */
   def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: 
ExponentialSleepStrategy) = {
     debug("Fetching system stream metadata for: %s" format streams)
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an 
exception
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
           streams.toSet,
           systemName,
-          getTopicMetadata)
+          getTopicMetadata,
+          metadataTTL)
 
         debug("Got metadata for streams: %s" format metadata)
 
         val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
-        var partitions = Map[String, Set[Partition]]()
         var oldestOffsets = Map[SystemStreamPartition, String]()
         var newestOffsets = Map[SystemStreamPartition, String]()
         var upcomingOffsets = Map[SystemStreamPartition, String]()
@@ -215,8 +223,9 @@ class KafkaSystemAdmin(
 
           val consumer = new SimpleConsumer(broker.host, broker.port, timeout, 
bufferSize, clientId)
           try {
-            oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.EarliestTime)
             upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.LatestTime)
+            oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, 
OffsetRequest.EarliestTime)
+
             // Kafka's "latest" offset is always last message in stream's 
offset +
             // 1, so get newest message in stream by subtracting one. this is 
safe
             // even for key-deduplicated streams, since the last message will
@@ -251,10 +260,75 @@ class KafkaSystemAdmin(
       (exception, loop) => {
         warn("Unable to fetch last offsets for streams %s due to %s. 
Retrying." format (streams, exception))
         debug("Exception detail:", exception)
+        metadataTTL = 5000 // Revert to the default cache expiration
       }).getOrElse(throw new SamzaException("Failed to get system stream 
metadata"))
   }
 
-  def createCoordinatorStream(streamName: String) {
+  /**
+    * Returns the newest offset for the specified SSP.
+    * This method is fast and targeted. It minimizes the number of kafka 
requests.
+    * It does not retry indefinitely if there is any failure.
+    * It returns null if the topic is empty. To get the offsets for *all*
+    * partitions, it would be more efficient to call getSystemStreamMetadata
+    */
+  override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: 
Integer) = {
+    debug("Fetching newest offset for: %s" format ssp)
+    var offset: String = null
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an 
exception
+    var retries = maxRetries
+    new ExponentialSleepStrategy().run(
+      loop => {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          Set(ssp.getStream),
+          systemName,
+          getTopicMetadata,
+          metadataTTL)
+        debug("Got metadata for streams: %s" format metadata)
+
+        val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
+        val topicAndPartition = new TopicAndPartition(ssp.getStream, 
ssp.getPartition.getPartitionId)
+        val broker = brokersToTopicPartitions.filter((e) => 
e._2.contains(topicAndPartition)).head._1
+
+        // Get oldest, newest, and upcoming offsets for each topic and 
partition.
+        debug("Fetching offset for %s:%s: %s" format (broker.host, 
broker.port, topicAndPartition))
+        val consumer = new SimpleConsumer(broker.host, broker.port, timeout, 
bufferSize, clientId)
+        try {
+          offset = getOffsets(consumer, Set(topicAndPartition), 
OffsetRequest.LatestTime).head._2
+
+          // Kafka's "latest" offset is always last message in stream's offset 
+
+          // 1, so get newest message in stream by subtracting one. this is 
safe
+          // even for key-deduplicated streams, since the last message will
+          // never be deduplicated.
+          if (offset.toLong <= 0) {
+            debug("Stripping newest offsets for %s because the topic appears 
empty." format topicAndPartition)
+            offset = null
+          } else {
+            offset = (offset.toLong - 1).toString
+          }
+        } finally {
+          consumer.close
+        }
+
+        debug("Got offset %s for %s." format(offset, ssp))
+        loop.done
+      },
+
+      (exception, loop) => {
+        if (retries > 0) {
+          warn("Exception while trying to get offset for %s: %s. Retrying." 
format(ssp, exception))
+          metadataTTL = 0L // Force metadata refresh
+          retries -= 1
+        } else {
+          warn("Exception while trying to get offset for %s" format(ssp), 
exception)
+          loop.done
+          throw exception
+        }
+      })
+
+     offset
+  }
+
+  override def createCoordinatorStream(streamName: String) {
     info("Attempting to create coordinator stream %s." format streamName)
     new ExponentialSleepStrategy(initialDelayMs = 500).run(
       loop => {
@@ -395,10 +469,11 @@ class KafkaSystemAdmin(
   private def validateTopicInKafka(topicName: String, 
numKafkaChangelogPartitions: Int) {
     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
     info("Validating changelog topic %s." format topicName)
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an 
exception
     retryBackoff.run(
       loop => {
         val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, 
clientId, timeout)
-        val topicMetadataMap = 
TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, 
metadataStore.getTopicInfo)
+        val topicMetadataMap = 
TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, 
metadataStore.getTopicInfo, metadataTTL)
         val topicMetadata = topicMetadataMap(topicName)
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
 
@@ -417,6 +492,7 @@ class KafkaSystemAdmin(
           case e: Exception =>
             warn("While trying to validate topic %s: %s. Retrying." format 
(topicName, e))
             debug("Exception detail:", e)
+            metadataTTL = 5000L // Revert to the default value
         }
       })
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 6c29223..f00405d 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -21,36 +21,31 @@
 
 package org.apache.samza.system.kafka
 
+import java.util
 import java.util.Properties
+
 import kafka.admin.AdminUtils
-import kafka.consumer.Consumer
-import kafka.consumer.ConsumerConfig
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
-import kafka.utils.TestUtils
-import kafka.utils.TestZKUtils
-import kafka.utils.Utils
-import kafka.utils.ZKStringSerializer
+import kafka.common.{ErrorMapping, LeaderNotAvailableException}
+import kafka.consumer.{Consumer, ConsumerConfig}
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
 import kafka.zk.EmbeddedZookeeper
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.config.KafkaProducerConfig
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.ClientUtilTopicMetadataStore
-import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
 import org.junit.Assert._
-import org.junit.{Test, BeforeClass, AfterClass}
+import org.junit._
+
 import scala.collection.JavaConversions._
-import org.apache.samza.config.KafkaProducerConfig
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
-import java.util
-import kafka.common.ErrorMapping
-import org.apache.samza.util.KafkaUtil
 
 object TestKafkaSystemAdmin {
+  val SYSTEM = "kafka"
   val TOPIC = "input"
+  val TOPIC2 = "input2"
   val TOTAL_PARTITIONS = 50
   val REPLICATION_FACTOR = 2
 
@@ -71,7 +66,7 @@ object TestKafkaSystemAdmin {
   val config = new util.HashMap[String, Object]()
   val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
   config.put("bootstrap.servers", brokers)
-  config.put("request.required.acks", "-1")
+  config.put("acks", "all")
   config.put("serializer.class", "kafka.serializer.StringEncoder")
   val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
@@ -92,11 +87,11 @@ object TestKafkaSystemAdmin {
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
   }
 
-  def createTopic {
+  def createTopic(topicName: String, partitionCount: Int) {
     AdminUtils.createTopic(
       zkClient,
-      TOPIC,
-      TOTAL_PARTITIONS,
+      topicName,
+      partitionCount,
       REPLICATION_FACTOR)
   }
 
@@ -107,7 +102,7 @@ object TestKafkaSystemAdmin {
 
     while (!done && retries < maxRetries) {
       try {
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
"kafka", metadataStore.getTopicInfo)
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
SYSTEM, metadataStore.getTopicInfo)
         val topicMetadata = topicMetadataMap(topic)
         val errorCode = topicMetadata.errorCode
 
@@ -162,26 +157,26 @@ object TestKafkaSystemAdmin {
 class TestKafkaSystemAdmin {
   import TestKafkaSystemAdmin._
 
-  val systemName = "test"
   // Provide a random zkAddress, the system admin tries to connect only when a 
topic is created/validated
-  val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () 
=> new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+  val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => 
new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
 
+  @Test
   def testShouldAssembleMetadata {
     val oldestOffsets = Map(
-      new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1",
-      new SystemStreamPartition("test", "stream2", new Partition(0)) -> "o2",
-      new SystemStreamPartition("test", "stream1", new Partition(1)) -> "o3",
-      new SystemStreamPartition("test", "stream2", new Partition(1)) -> "o4")
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "o1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "o2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "o3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "o4")
     val newestOffsets = Map(
-      new SystemStreamPartition("test", "stream1", new Partition(0)) -> "n1",
-      new SystemStreamPartition("test", "stream2", new Partition(0)) -> "n2",
-      new SystemStreamPartition("test", "stream1", new Partition(1)) -> "n3",
-      new SystemStreamPartition("test", "stream2", new Partition(1)) -> "n4")
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "n1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "n2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "n3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "n4")
     val upcomingOffsets = Map(
-      new SystemStreamPartition("test", "stream1", new Partition(0)) -> "u1",
-      new SystemStreamPartition("test", "stream2", new Partition(0)) -> "u2",
-      new SystemStreamPartition("test", "stream1", new Partition(1)) -> "u3",
-      new SystemStreamPartition("test", "stream2", new Partition(1)) -> "u4")
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "u1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4")
     val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, 
newestOffsets, upcomingOffsets)
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
@@ -208,7 +203,7 @@ class TestKafkaSystemAdmin {
   @Test
   def testShouldGetOldestNewestAndNextOffsets {
     // Create an empty topic with 50 partitions, but with no offsets.
-    createTopic
+    createTopic(TOPIC, 50)
     validateTopic(TOPIC, 50)
 
     // Verify the empty topic behaves as expected.
@@ -290,7 +285,7 @@ class TestKafkaSystemAdmin {
   @Test
   def testShouldCreateCoordinatorStream {
     val topic = "test-coordinator-stream"
-    val systemAdmin = new KafkaSystemAdmin("test", brokers, () => new 
ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), 
coordinatorStreamReplicationFactor = 3)
+    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => new 
ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), 
coordinatorStreamReplicationFactor = 3)
     systemAdmin.createCoordinatorStream(topic)
     validateTopic(topic, 1)
     val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
"kafka", metadataStore.getTopicInfo)
@@ -301,11 +296,12 @@ class TestKafkaSystemAdmin {
     assertEquals(3, partitionMetadata.replicas.size)
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends 
KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, 
ZKStringSerializer)) {
-    import kafka.api.{ TopicMetadata, TopicMetadataResponse }
-
+  class KafkaSystemAdminWithTopicMetadataError extends 
KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, 
ZKStringSerializer)) {
+    import kafka.api.TopicMetadata
+    var metadataCallCount = 0
     // Simulate Kafka telling us that the leader for the topic is not available
     override def getTopicMetadata(topics: Set[String]) = {
+      metadataCallCount += 1
       val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = 
Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
       Map("quux" -> topicMetadata)
     }
@@ -322,4 +318,45 @@ class TestKafkaSystemAdmin {
       case e: ExponentialSleepStrategy.CallLimitReached => ()
     }
   }
+
+  @Test
+  def testGetNewestOffset {
+    createTopic(TOPIC2, 16)
+    validateTopic(TOPIC2, 16)
+
+    val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new 
Partition(4))
+    val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new 
Partition(13))
+
+    assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Add a new message to one of the partitions, and verify that it works as 
expected.
+    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, 
"key1".getBytes, "val1".getBytes)).get().offset().toString)
+    assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Again
+    assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, 
"key2".getBytes, "val2".getBytes)).get().offset().toString)
+    assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Add a message to both partitions
+    assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, 
"key3".getBytes, "val3".getBytes)).get().offset().toString)
+    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, 
"key4".getBytes, "val4".getBytes)).get().offset().toString)
+    assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
+    assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
+  }
+
+  @Test (expected = classOf[LeaderNotAvailableException])
+  def testGetNewestOffsetMaxRetry {
+    val expectedRetryCount = 3
+    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+    try {
+      systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", 
new Partition(0)), 3)
+    } catch {
+      case e: Exception =>
+        assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
+        throw e
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index f0965ae..38c8fa0 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -196,8 +196,9 @@ class RocksDbKeyValueStore(
 
   def flush {
     metrics.flushes.inc
-    trace("Flushing.")
+    trace("Flushing store: %s" format storeName)
     db.flush(flushOptions)
+    trace("Flushed store: %s" format storeName)
   }
 
   def close() {

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index ae6717d..e7e4ede 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -185,11 +185,12 @@ class CachedStore[K, V](
   }
 
   override def flush() {
-    trace("Flushing.")
-
+    trace("Purging dirty entries from CachedStore.")
     metrics.flushes.inc
     putAllDirtyEntries()
+    trace("Flushing store.")
     store.flush()
+    trace("Flushed store.")
   }
 
   private def putAllDirtyEntries() {

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 7bba6ff..dc5cbcd 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -100,11 +100,12 @@ class LoggedStore[K, V](
   }
 
   def flush {
-    trace("Flushing.")
+    trace("Flushing store.")
 
     metrics.flushes.inc
 
     store.flush
+    trace("Flushed store.")
   }
 
   def close {

http://git-wip-us.apache.org/repos/asf/samza/blob/f4bd84bb/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 8e183ef..d77d476 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -110,11 +110,12 @@ class SerializedKeyValueStore[K, V](
   }
 
   def flush {
-    trace("Flushing.")
+    trace("Flushing store.")
 
     metrics.flushes.inc
 
     store.flush
+    trace("Flushed store.")
   }
 
   def close {

Reply via email to