Repository: incubator-samza Updated Branches: refs/heads/master 085a8b4b6 -> 89939a6c9
SAMZA-178: Don't catch Throwable, because we want OutOfMemoryError to kill the process. Reviewed by Chris Riccomini. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/89939a6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/89939a6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/89939a6c Branch: refs/heads/master Commit: 89939a6c91798ae28dbfd552cb7b2dea9efc3d3f Parents: 085a8b4 Author: Martin Kleppmann <[email protected]> Authored: Fri Mar 21 11:09:34 2014 +0000 Committer: Martin Kleppmann <[email protected]> Committed: Fri Mar 21 17:02:07 2014 +0000 ---------------------------------------------------------------------- .../scala/org/apache/samza/container/SamzaContainer.scala | 6 +++--- .../main/scala/org/apache/samza/job/local/ThreadJob.scala | 2 +- .../org/apache/samza/serializers/CheckpointSerde.scala | 8 ++++++-- .../apache/samza/util/TestExponentialSleepStrategy.scala | 3 +-- .../src/test/scala/org/apache/samza/util/TestUtil.scala | 9 +++++---- .../scala/org/apache/samza/system/kafka/BrokerProxy.scala | 1 - .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 2 +- .../apache/samza/system/kafka/TestKafkaSystemAdmin.scala | 3 +-- .../apache/samza/system/kafka/TestKafkaSystemFactory.scala | 4 ++-- .../apache/samza/test/integration/TestStatefulTask.scala | 2 +- .../scala/org/apache/samza/job/yarn/YarnAppMaster.scala | 2 +- .../main/scala/org/apache/samza/webapp/WebAppServer.scala | 4 +--- 12 files changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index c101b59..77bf0e9 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -133,7 +133,7 @@ object SamzaContainer extends Logging { try { (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry)) } catch { - case e: Throwable => + case e: Exception => info("Failed to create a consumer for %s, so skipping." format systemName) debug(e) (systemName, null) @@ -150,7 +150,7 @@ object SamzaContainer extends Logging { try { (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry)) } catch { - case e: Throwable => + case e: Exception => info("Failed to create a producer for %s, so skipping." format systemName) debug(e) (systemName, null) @@ -512,7 +512,7 @@ class SamzaContainer( } } } catch { - case e: Throwable => + case e: Exception => error("Caught exception in process loop.", e) throw e } finally { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala index 62994b0..0acc3de 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala @@ -42,7 +42,7 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging { runnable.run jobStatus = Some(SuccessfulFinish) } catch { - case e: Throwable => { + case e: Exception => { error("Failing job with exception.", e) jobStatus = Some(UnsuccessfulFinish) throw e http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala index f93f1c8..2ed8d7d 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala @@ -24,8 +24,9 @@ import org.codehaus.jackson.map.ObjectMapper import org.apache.samza.system.SystemStream import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.SamzaException +import grizzled.slf4j.Logging -class CheckpointSerde extends Serde[Checkpoint] { +class CheckpointSerde extends Serde[Checkpoint] with Logging { val jsonMapper = new ObjectMapper() def fromBytes(bytes: Array[Byte]): Checkpoint = { @@ -38,7 +39,10 @@ class CheckpointSerde extends Serde[Checkpoint] { } return new Checkpoint(checkpointMap) } catch { - case _ : Throwable => return null + case e : Exception => + warn("Exception while deserializing checkpoint: " + e) + debug(e) + null } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala index 6cea6a2..4a561d1 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -113,7 +113,6 @@ class TestExponentialSleepStrategy { fail("expected exception to be thrown") } catch { case e: IllegalArgumentException => assertEquals("boom", e.getMessage) - case e: Throwable => throw e } assertEquals(1, iterations) assertEquals(0, loopObject.sleepCount) @@ -123,7 +122,7 @@ class TestExponentialSleepStrategy { var exception: Option[Throwable] = None val interruptee = new Thread(new Runnable { def run { - try { operation } catch { case e: Throwable => exception = Some(e) } + try { operation } catch { case e: Exception => exception = Some(e) } } }) interruptee.start http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index 1bfd63c..49aed36 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -33,10 +33,11 @@ object TestUtil { def expect[T](exception: Class[T], msg: Option[String] = None)(block: => Unit) = try { block } catch { - case e => if (msg.isDefined) { - assertEquals(msg.get, e.getMessage) - } - case _ => fail("Expected an NPE.") + case e: Exception => + assertEquals(e.getClass, exception) + if (msg.isDefined) { + assertEquals(msg.get, e.getMessage) + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index f240d69..bca2f86 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -225,7 +225,6 @@ class BrokerProxy( // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail. case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating") abdicate(e.tp) - case other : Throwable => throw other } }) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index bd1f824..9c0ca60 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -190,7 +190,7 @@ class TestBrokerProxy extends Logging { fail("Should have thrown an exception") } catch { case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") - case other: Throwable => fail("Got some other exception than what we were expecting: " + other) + case other: Exception => fail("Got some other exception than what we were expecting: " + other) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index e43970c..6be9732 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -117,7 +117,7 @@ object TestKafkaSystemAdmin { done = expectedPartitionCount == topicMetadata.partitionsMetadata.size } catch { - case e: Throwable => + case e: Exception => System.err.println("Got exception while validating test topics. Waiting and retrying.", e) retries += 1 Thread.sleep(500) @@ -308,7 +308,6 @@ class TestKafkaSystemAdmin { fail("expected CallLimitReached to be thrown") } catch { case e: ExponentialSleepStrategy.CallLimitReached => () - case e: Throwable => throw e } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala index d08b9fa..aba39c0 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala @@ -39,7 +39,7 @@ class TestKafkaSystemFactory { fail("Expected to get a Samza exception.") } catch { case e: SamzaException => None // expected - case _ : Throwable => fail("Expected to get a Samza exception.") + case e: Exception => fail("Expected SamzaException, but got " + e) } } @@ -56,7 +56,7 @@ class TestKafkaSystemFactory { fail("Expected to get a Samza exception.") } catch { case e: SamzaException => None // expected - case _ : Throwable => fail("Expected to get a Samza exception.") + case e: Exception => fail("Expected SamzaException, but got " + e) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 8177cbf..6fdfcfc 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -155,7 +155,7 @@ object TestStatefulTask { done = true } catch { - case e: Throwable => + case e: Exception => System.err.println("Got exception while validating test topics. Waiting and retrying.", e) retries += 1 Thread.sleep(500) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala index 4938192..e45c177 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala @@ -78,7 +78,7 @@ class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener] listeners.foreach(listener => try { listener.onShutdown } catch { - case e: Throwable => warn("Listener %s failed to shutdown." format listener, e) + case e: Exception => warn("Listener %s failed to shutdown." format listener, e) }) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/89939a6c/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala index d524996..4eaaf7c 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala @@ -54,9 +54,7 @@ class WebAppServer(rootPath: String) { val connector : Connector = server.getConnectors()(0).asInstanceOf[Connector] port = connector.getLocalPort } catch { - case e: Throwable => { - throw new SamzaException("Error when getting the port", e) - } + case e: Exception => throw new SamzaException("Error when getting the port", e) } } }
