SAMZA-754: set the oldest offset to 0 on empty topic partition

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6bc141b4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6bc141b4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6bc141b4

Branch: refs/heads/samza-sql
Commit: 6bc141b4780e00e4d151892780d9208944d6730d
Parents: 0e94975
Author: Yi Pan <[email protected]>
Authored: Thu Nov 19 00:00:38 2015 -0800
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Nov 19 00:00:38 2015 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemAdmin.scala  | 10 ++++++++--
 .../apache/samza/system/kafka/TestKafkaSystemAdmin.scala  |  8 ++++----
 .../samza/test/integration/StreamTaskTestUtil.scala       |  2 +-
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index aee33a9..9dc436a 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -194,9 +194,15 @@ class KafkaSystemAdmin(
             upcomingOffsets.foreach {
               case (topicAndPartition, offset) =>
                 if (offset.toLong <= 0) {
-                  debug("Stripping oldest/newest offsets for %s because the 
topic appears empty." format topicAndPartition)
-                  oldestOffsets -= topicAndPartition
+                  debug("Stripping newest offsets for %s because the topic 
appears empty." format topicAndPartition)
                   newestOffsets -= topicAndPartition
+                  debug("Setting oldest offset to 0 to consume from beginning")
+                  oldestOffsets.get(topicAndPartition) match {
+                    case Some(s) =>
+                      oldestOffsets.updated(topicAndPartition, "0")
+                    case None =>
+                      oldestOffsets.put(topicAndPartition, "0")
+                  }
                 }
             }
           } finally {

http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index d260f2d..6c29223 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -218,8 +218,8 @@ class TestKafkaSystemAdmin {
     // Verify partition count.
     var sspMetadata = metadata(TOPIC).getSystemStreamPartitionMetadata
     assertEquals(50, sspMetadata.size)
-    // Empty topics should have null for earliest/latest offset.
-    assertNull(sspMetadata.get(new Partition(0)).getOldestOffset)
+    // Empty topics should have null for latest offset and 0 for earliest 
offset
+    assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
     assertNull(sspMetadata.get(new Partition(0)).getNewestOffset)
     // Empty Kafka topics should have a next offset of 0.
     assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
@@ -237,7 +237,7 @@ class TestKafkaSystemAdmin {
     assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset)
     assertEquals("1", sspMetadata.get(new Partition(48)).getUpcomingOffset)
     // Some other partition should be empty.
-    assertNull(sspMetadata.get(new Partition(3)).getOldestOffset)
+    assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset)
     assertNull(sspMetadata.get(new Partition(3)).getNewestOffset)
     assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
 
@@ -273,7 +273,7 @@ class TestKafkaSystemAdmin {
     val initialOffsets = 
systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", 
fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0"))))
+      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0"))))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 6e260bd..8d7e3fe 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -98,6 +98,7 @@ object StreamTaskTestUtil {
    */
   var jobConfig = Map(
     "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
+    "job.coordinator.system" -> "kafka",
     "task.inputs" -> "kafka.input",
     "serializers.registry.string.class" -> 
"org.apache.samza.serializers.StringSerdeFactory",
     "systems.kafka.samza.factory" -> 
"org.apache.samza.system.kafka.KafkaSystemFactory",
@@ -109,7 +110,6 @@ object StreamTaskTestUtil {
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
     "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format 
port1),
     // Since using state, need a checkpoint manager
-    // Due to SAMZA-754, the following section can not be removed yet.
     "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
     "task.checkpoint.system" -> "kafka",
     "task.checkpoint.replication.factor" -> "1",

Reply via email to