Repository: incubator-samza
Updated Branches:
  refs/heads/master f3cb10924 -> 354bcdb77


SAMZA-151: Fail early when a consumer is misconfigured


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/354bcdb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/354bcdb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/354bcdb7

Branch: refs/heads/master
Commit: 354bcdb77bc8d4d1b0efdf9ca405916c7399075a
Parents: f3cb109
Author: Yan Fang <yanfang724 at gmail dot com>
Authored: Thu May 1 14:50:51 2014 -0700
Committer: Jakob Homan <[email protected]>
Committed: Thu May 1 14:50:51 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/system/SystemConsumers.scala   | 17 +++++++++-
 .../samza/system/TestSystemConsumers.scala      | 35 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 7624aef..b537046 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -26,6 +26,7 @@ import grizzled.slf4j.Logging
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.util.DoublingBackOff
 import org.apache.samza.system.chooser.BufferingMessageChooser
+import org.apache.samza.SamzaException
 
 /**
  * The SystemConsumers class coordinates between all SystemConsumers, the
@@ -159,7 +160,12 @@ class SystemConsumers(
     metrics.registerSystemStream(systemStreamPartition.getSystemStream)
     buffer.register(systemStreamPartition, offset)
     updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
-    consumers(systemStreamPartition.getSystem).register(systemStreamPartition, 
offset)
+
+    try {
+      
consumers(systemStreamPartition.getSystem).register(systemStreamPartition, 
offset)
+    } catch {
+      case e: NoSuchElementException => throw new 
SystemConsumersException("can't register " + systemStreamPartition.getSystem + 
"'s consumer.", e)
+    }
   }
 
   /**
@@ -280,3 +286,12 @@ class SystemConsumers(
       .foreach(updateFetchMap(_))
   }
 }
+
+/**
+ * When SystemConsumer registers consumers, there are situations where system 
can not recover
+ * from. Such as a failed consumer is used in task.input and changelogs.
+ * SystemConsumersException is thrown to indicate a hard failure when the 
system can not recover from.
+ */
+class SystemConsumersException(s: String, t: Throwable) extends 
SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/354bcdb7/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index e1b211d..e1a4c15 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -58,4 +58,39 @@ class TestSystemConsumers {
     assertEquals(1, registered.size)
     assertEquals("0", registered(systemStreamPartition))
   }
+
+  @Test
+  def testThrowSystemConsumersExceptionWhenTheSystemDoesNotHaveConsumer() {
+    val system = "test-system"
+    val system2 = "test-system2"
+    val systemStreamPartition = new SystemStreamPartition(system, 
"some-stream", new Partition(1))
+    val systemStreamPartition2 = new SystemStreamPartition(system2, 
"some-stream", new Partition(1))
+    var started = 0
+    var stopped = 0
+    var registered = Map[SystemStreamPartition, String]()
+    
+    val consumer = Map(system -> new SystemConsumer {
+      def start {}
+      def stop {}
+      def register(systemStreamPartition: SystemStreamPartition, offset: 
String) {}
+      def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, 
java.lang.Integer], timeout: Long) = List()
+    })
+    val consumers = new SystemConsumers(new MessageChooser {
+      def update(envelope: IncomingMessageEnvelope) = Unit
+      def choose = null
+      def start = started += 1
+      def stop = stopped += 1
+      def register(systemStreamPartition: SystemStreamPartition, offset: 
String) = registered += systemStreamPartition -> offset
+    }, consumer, null)
+     
+    // it should throw a SystemConsumersException because system2 does not 
have a consumer
+    var caughtRightException = false
+    try {
+       consumers.register(systemStreamPartition2, "0")
+    } catch {
+       case e: SystemConsumersException => caughtRightException = true
+       case _: Throwable => caughtRightException = false
+    }
+    assertTrue("suppose to throw SystemConsumersException, but apparently it 
did not", caughtRightException)
+  }
 }
\ No newline at end of file

Reply via email to