SAMZA-754: set the oldest offset to 0 on empty topic partition
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6bc141b4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6bc141b4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6bc141b4 Branch: refs/heads/samza-sql Commit: 6bc141b4780e00e4d151892780d9208944d6730d Parents: 0e94975 Author: Yi Pan <[email protected]> Authored: Thu Nov 19 00:00:38 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Nov 19 00:00:38 2015 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaSystemAdmin.scala | 10 ++++++++-- .../apache/samza/system/kafka/TestKafkaSystemAdmin.scala | 8 ++++---- .../samza/test/integration/StreamTaskTestUtil.scala | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index aee33a9..9dc436a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -194,9 +194,15 @@ class KafkaSystemAdmin( upcomingOffsets.foreach { case (topicAndPartition, offset) => if (offset.toLong <= 0) { - debug("Stripping oldest/newest offsets for %s because the topic appears empty." format topicAndPartition) - oldestOffsets -= topicAndPartition + debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) newestOffsets -= topicAndPartition + debug("Setting oldest offset to 0 to consume from beginning") + oldestOffsets.get(topicAndPartition) match { + case Some(s) => + oldestOffsets.updated(topicAndPartition, "0") + case None => + oldestOffsets.put(topicAndPartition, "0") + } } } } finally { http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index d260f2d..6c29223 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -218,8 +218,8 @@ class TestKafkaSystemAdmin { // Verify partition count. var sspMetadata = metadata(TOPIC).getSystemStreamPartitionMetadata assertEquals(50, sspMetadata.size) - // Empty topics should have null for earliest/latest offset. - assertNull(sspMetadata.get(new Partition(0)).getOldestOffset) + // Empty topics should have null for latest offset and 0 for earliest offset + assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset) assertNull(sspMetadata.get(new Partition(0)).getNewestOffset) // Empty Kafka topics should have a next offset of 0. assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset) @@ -237,7 +237,7 @@ class TestKafkaSystemAdmin { assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset) assertEquals("1", sspMetadata.get(new Partition(48)).getUpcomingOffset) // Some other partition should be empty. - assertNull(sspMetadata.get(new Partition(3)).getOldestOffset) + assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset) assertNull(sspMetadata.get(new Partition(3)).getNewestOffset) assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset) @@ -273,7 +273,7 @@ class TestKafkaSystemAdmin { val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic")) val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata")) assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( - new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")))) + new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")))) } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/6bc141b4/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 6e260bd..8d7e3fe 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -98,6 +98,7 @@ object StreamTaskTestUtil { */ var jobConfig = Map( "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName, + "job.coordinator.system" -> "kafka", "task.inputs" -> "kafka.input", "serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory", "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", @@ -109,7 +110,6 @@ object StreamTaskTestUtil { "systems.kafka.consumer.zookeeper.connect" -> zkConnect, "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1), // Since using state, need a checkpoint manager - // Due to SAMZA-754, the following section can not be removed yet. "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", "task.checkpoint.system" -> "kafka", "task.checkpoint.replication.factor" -> "1",
