Repository: incubator-samza Updated Branches: refs/heads/master a8bbf251c -> 2283fd236
SAMZA-461; fix race condition when starting job with an empty stream Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2283fd23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2283fd23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2283fd23 Branch: refs/heads/master Commit: 2283fd236b4d4e885e39b70d5c07b972d164de27 Parents: a8bbf25 Author: Ben Kirwin <[email protected]> Authored: Fri Nov 21 10:32:18 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Fri Nov 21 10:32:18 2014 -0800 ---------------------------------------------------------------------- .../org/apache/samza/checkpoint/OffsetManager.scala | 9 ++++++++- .../apache/samza/checkpoint/TestOffsetManager.scala | 13 +++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2283fd23/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 80c8d0e..a40c87f 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -343,7 +343,14 @@ class OffsetManager( .get(partition) if (systemStreamPartitionMetadata != null) { - val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType) + val nextOffset = { + val requested = systemStreamPartitionMetadata.getOffset(offsetType) + + if (requested == null) { + warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition)) + systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING) + } else requested + } debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2283fd23/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index a79ecca..35e7f6b 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -216,6 +216,19 @@ class TestOffsetManager { assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null)) } + @Test + def testDefaultToUpcomingOnMissingDefault { + val taskName = new TaskName("task-name") + val ssp = new SystemStreamPartition(new SystemStream("test-system", "test-stream"), new Partition(0)) + val sspm = new SystemStreamPartitionMetadata(null, null, "13") + val offsetMeta = new SystemStreamMetadata("test-stream", Map(new Partition(0) -> sspm)) + val settings = new OffsetSetting(offsetMeta, OffsetType.OLDEST, resetOffset = false) + val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings)) + offsetManager.register(taskName, Set(ssp)) + offsetManager.start + assertEquals(Some("13"), offsetManager.getStartingOffset(ssp)) + } + private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = { val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
