Repository: incubator-samza
Updated Branches:
  refs/heads/master 085a8b4b6 -> 89939a6c9


SAMZA-178: Don't catch Throwable, because we want OutOfMemoryError to kill the 
process. Reviewed by Chris Riccomini.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/89939a6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/89939a6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/89939a6c

Branch: refs/heads/master
Commit: 89939a6c91798ae28dbfd552cb7b2dea9efc3d3f
Parents: 085a8b4
Author: Martin Kleppmann <[email protected]>
Authored: Fri Mar 21 11:09:34 2014 +0000
Committer: Martin Kleppmann <[email protected]>
Committed: Fri Mar 21 17:02:07 2014 +0000

----------------------------------------------------------------------
 .../scala/org/apache/samza/container/SamzaContainer.scala   | 6 +++---
 .../main/scala/org/apache/samza/job/local/ThreadJob.scala   | 2 +-
 .../org/apache/samza/serializers/CheckpointSerde.scala      | 8 ++++++--
 .../apache/samza/util/TestExponentialSleepStrategy.scala    | 3 +--
 .../src/test/scala/org/apache/samza/util/TestUtil.scala     | 9 +++++----
 .../scala/org/apache/samza/system/kafka/BrokerProxy.scala   | 1 -
 .../org/apache/samza/system/kafka/TestBrokerProxy.scala     | 2 +-
 .../apache/samza/system/kafka/TestKafkaSystemAdmin.scala    | 3 +--
 .../apache/samza/system/kafka/TestKafkaSystemFactory.scala  | 4 ++--
 .../apache/samza/test/integration/TestStatefulTask.scala    | 2 +-
 .../scala/org/apache/samza/job/yarn/YarnAppMaster.scala     | 2 +-
 .../main/scala/org/apache/samza/webapp/WebAppServer.scala   | 4 +---
 12 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c101b59..77bf0e9 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -133,7 +133,7 @@ object SamzaContainer extends Logging {
         try {
           (systemName, systemFactory.getConsumer(systemName, config, 
samzaContainerMetrics.registry))
         } catch {
-          case e: Throwable =>
+          case e: Exception =>
             info("Failed to create a consumer for %s, so skipping." format 
systemName)
             debug(e)
             (systemName, null)
@@ -150,7 +150,7 @@ object SamzaContainer extends Logging {
           try {
             (systemName, systemFactory.getProducer(systemName, config, 
samzaContainerMetrics.registry))
           } catch {
-            case e: Throwable =>
+            case e: Exception =>
               info("Failed to create a producer for %s, so skipping." format 
systemName)
               debug(e)
               (systemName, null)
@@ -512,7 +512,7 @@ class SamzaContainer(
         }
       }
     } catch {
-      case e: Throwable =>
+      case e: Exception =>
         error("Caught exception in process loop.", e)
         throw e
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index 62994b0..0acc3de 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -42,7 +42,7 @@ class ThreadJob(runnable: Runnable) extends StreamJob with 
Logging {
           runnable.run
           jobStatus = Some(SuccessfulFinish)
         } catch {
-          case e: Throwable => {
+          case e: Exception => {
             error("Failing job with exception.", e)
             jobStatus = Some(UnsuccessfulFinish)
             throw e

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
index f93f1c8..2ed8d7d 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
@@ -24,8 +24,9 @@ import org.codehaus.jackson.map.ObjectMapper
 import org.apache.samza.system.SystemStream
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.SamzaException
+import grizzled.slf4j.Logging
 
-class CheckpointSerde extends Serde[Checkpoint] {
+class CheckpointSerde extends Serde[Checkpoint] with Logging {
   val jsonMapper = new ObjectMapper()
 
   def fromBytes(bytes: Array[Byte]): Checkpoint = {
@@ -38,7 +39,10 @@ class CheckpointSerde extends Serde[Checkpoint] {
         }
       return new Checkpoint(checkpointMap)
     } catch {
-      case _ : Throwable => return null
+      case e : Exception =>
+        warn("Exception while deserializing checkpoint: " + e)
+        debug(e)
+        null
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
 
b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 6cea6a2..4a561d1 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -113,7 +113,6 @@ class TestExponentialSleepStrategy {
       fail("expected exception to be thrown")
     } catch {
       case e: IllegalArgumentException => assertEquals("boom", e.getMessage)
-      case e: Throwable => throw e
     }
     assertEquals(1, iterations)
     assertEquals(0, loopObject.sleepCount)
@@ -123,7 +122,7 @@ class TestExponentialSleepStrategy {
     var exception: Option[Throwable] = None
     val interruptee = new Thread(new Runnable {
       def run {
-        try { operation } catch { case e: Throwable => exception = Some(e) }
+        try { operation } catch { case e: Exception => exception = Some(e) }
       }
     })
     interruptee.start

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 1bfd63c..49aed36 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -33,10 +33,11 @@ object TestUtil {
   def expect[T](exception: Class[T], msg: Option[String] = None)(block: => 
Unit) = try {
     block
   } catch {
-    case e => if (msg.isDefined) {
-      assertEquals(msg.get, e.getMessage)
-    }
-    case _ => fail("Expected an NPE.")
+    case e: Exception =>
+      assertEquals(e.getClass, exception)
+      if (msg.isDefined) {
+        assertEquals(msg.get, e.getMessage)
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index f240d69..bca2f86 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -225,7 +225,6 @@ class BrokerProxy(
         // UnknownTopic or NotLeader are routine events and handled via 
abdication.  All others, bail.
         case _ @ (_:UnknownTopicOrPartitionException | _: 
NotLeaderForPartitionException) => warn("Received 
(UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating")
                                                                                
              abdicate(e.tp)
-        case other : Throwable => throw other
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index bd1f824..9c0ca60 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -190,7 +190,7 @@ class TestBrokerProxy extends Logging {
       fail("Should have thrown an exception")
     } catch {
       case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
-      case other: Throwable => fail("Got some other exception than what we 
were expecting: " + other)
+      case other: Exception => fail("Got some other exception than what we 
were expecting: " + other)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index e43970c..6be9732 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -117,7 +117,7 @@ object TestKafkaSystemAdmin {
 
         done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
       } catch {
-        case e: Throwable =>
+        case e: Exception =>
           System.err.println("Got exception while validating test topics. 
Waiting and retrying.", e)
           retries += 1
           Thread.sleep(500)
@@ -308,7 +308,6 @@ class TestKafkaSystemAdmin {
       fail("expected CallLimitReached to be thrown")
     } catch {
       case e: ExponentialSleepStrategy.CallLimitReached => ()
-      case e: Throwable => throw e
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index d08b9fa..aba39c0 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -39,7 +39,7 @@ class TestKafkaSystemFactory {
       fail("Expected to get a Samza exception.")
     } catch {
       case e: SamzaException => None // expected
-      case _ : Throwable => fail("Expected to get a Samza exception.")
+      case e: Exception => fail("Expected SamzaException, but got " + e)
     }
   }
 
@@ -56,7 +56,7 @@ class TestKafkaSystemFactory {
       fail("Expected to get a Samza exception.")
     } catch {
       case e: SamzaException => None // expected
-      case _ : Throwable => fail("Expected to get a Samza exception.")
+      case e: Exception => fail("Expected SamzaException, but got " + e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 8177cbf..6fdfcfc 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -155,7 +155,7 @@ object TestStatefulTask {
 
         done = true
       } catch {
-        case e: Throwable =>
+        case e: Exception =>
           System.err.println("Got exception while validating test topics. 
Waiting and retrying.", e)
           retries += 1
           Thread.sleep(500)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
index 4938192..e45c177 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
@@ -78,7 +78,7 @@ class YarnAppMaster(pollIntervalMs: Long, listeners: 
List[YarnAppMasterListener]
       listeners.foreach(listener => try {
         listener.onShutdown
       } catch {
-        case e: Throwable => warn("Listener %s failed to shutdown." format 
listener, e)
+        case e: Exception => warn("Listener %s failed to shutdown." format 
listener, e)
       })
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
index d524996..4eaaf7c 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala
@@ -54,9 +54,7 @@ class WebAppServer(rootPath: String) {
       val connector : Connector = 
server.getConnectors()(0).asInstanceOf[Connector]
       port = connector.getLocalPort
     } catch {
-      case e: Throwable => {
-        throw new SamzaException("Error when getting the port", e)
-      }
+      case e: Exception => throw new SamzaException("Error when getting the 
port", e)
     }
   }
 }

Reply via email to