Repository: incubator-samza Updated Branches: refs/heads/master f3cb10924 -> 354bcdb77
SAMZA-151: Fail early when a consumer is misconfigured Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/354bcdb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/354bcdb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/354bcdb7 Branch: refs/heads/master Commit: 354bcdb77bc8d4d1b0efdf9ca405916c7399075a Parents: f3cb109 Author: Yan Fang <yanfang724 at gmail dot com> Authored: Thu May 1 14:50:51 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Thu May 1 14:50:51 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/system/SystemConsumers.scala | 17 +++++++++- .../samza/system/TestSystemConsumers.scala | 35 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 7624aef..b537046 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -26,6 +26,7 @@ import grizzled.slf4j.Logging import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.util.DoublingBackOff import org.apache.samza.system.chooser.BufferingMessageChooser +import org.apache.samza.SamzaException /** * The SystemConsumers class coordinates between all SystemConsumers, the @@ -159,7 +160,12 @@ class SystemConsumers( metrics.registerSystemStream(systemStreamPartition.getSystemStream) buffer.register(systemStreamPartition, offset) updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition) - consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset) + + try { + consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset) + } catch { + case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e) + } } /** @@ -280,3 +286,12 @@ class SystemConsumers( .foreach(updateFetchMap(_)) } } + +/** + * When SystemConsumer registers consumers, there are situations where system can not recover + * from. Such as a failed consumer is used in task.input and changelogs. + * SystemConsumersException is thrown to indicate a hard failure when the system can not recover from. + */ +class SystemConsumersException(s: String, t: Throwable) extends SamzaException(s, t) { + def this(s: String) = this(s, null) +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index e1b211d..e1a4c15 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -58,4 +58,39 @@ class TestSystemConsumers { assertEquals(1, registered.size) assertEquals("0", registered(systemStreamPartition)) } + + @Test + def testThrowSystemConsumersExceptionWhenTheSystemDoesNotHaveConsumer() { + val system = "test-system" + val system2 = "test-system2" + val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) + val systemStreamPartition2 = new SystemStreamPartition(system2, "some-stream", new Partition(1)) + var started = 0 + var stopped = 0 + var registered = Map[SystemStreamPartition, String]() + + val consumer = Map(system -> new SystemConsumer { + def start {} + def stop {} + def register(systemStreamPartition: SystemStreamPartition, offset: String) {} + def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List() + }) + val consumers = new SystemConsumers(new MessageChooser { + def update(envelope: IncomingMessageEnvelope) = Unit + def choose = null + def start = started += 1 + def stop = stopped += 1 + def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset + }, consumer, null) + + // it should throw a SystemConsumersException because system2 does not have a consumer + var caughtRightException = false + try { + consumers.register(systemStreamPartition2, "0") + } catch { + case e: SystemConsumersException => caughtRightException = true + case _: Throwable => caughtRightException = false + } + assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException) + } } \ No newline at end of file
