Repository: samza Updated Branches: refs/heads/master 3a4886a65 -> 79ec5dbfc
SAMZA-736: Fixed infinite loop in BrokerProxy when OOME/Stackoverflow occurs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/79ec5dbf Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/79ec5dbf Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/79ec5dbf Branch: refs/heads/master Commit: 79ec5dbfcf80e9c346baf585dba20e7bd3098ff1 Parents: 3a4886a Author: Aleksandar Pejakovic <[email protected]> Authored: Mon Aug 24 01:38:59 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Mon Aug 24 01:38:59 2015 -0700 ---------------------------------------------------------------------- .../samza/util/ExponentialSleepStrategy.scala | 2 + .../apache/samza/system/kafka/BrokerProxy.scala | 7 +- .../samza/system/kafka/TestBrokerProxy.scala | 189 ++++++++++++------- 3 files changed, 131 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala index 376b277..4a04c13 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala @@ -84,6 +84,8 @@ class ExponentialSleepStrategy( } catch { case e: InterruptedException => throw e case e: ClosedByInterruptException => throw e + case e: OutOfMemoryError => throw e + case e: StackOverflowError => throw e case e: Exception => onException(e, loop) } if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 614f33f..c8cbc38 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -28,6 +28,7 @@ import kafka.api._ import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig import kafka.message.MessageSet +import org.apache.samza.SamzaException import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX @@ -84,7 +85,7 @@ class BrokerProxy( val hostString = "%s:%d" format (host, port) info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system)) - val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) + val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) sc } @@ -160,8 +161,10 @@ class BrokerProxy( reconnect = true }) } catch { - case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") + case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.") + case e: OutOfMemoryError => throw new SamzaException("Got out of memory error in broker proxy thread.") + case e: StackOverflowError => throw new SamzaException("Got stack overflow error in broker proxy thread.") } if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.") http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index e285dec..170318e 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -21,25 +21,21 @@ package org.apache.samza.system.kafka import java.nio.ByteBuffer -import java.nio.channels.ClosedChannelException import java.util.concurrent.CountDownLatch -import kafka.api._ -import kafka.api.PartitionOffsetsResponse -import kafka.common.ErrorMapping -import kafka.common.TopicAndPartition +import kafka.api.{PartitionOffsetsResponse, _} +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.SimpleConsumer -import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet} - +import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet} import org.apache.samza.SamzaException import org.apache.samza.util.Logging -import org.junit._ import org.junit.Assert._ -import org.mockito.{Matchers, Mockito} -import org.mockito.Mockito._ +import org.junit._ import org.mockito.Matchers._ -import org.mockito.stubbing.Answer +import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.{Matchers, Mockito} import scala.collection.JavaConversions._ @@ -47,14 +43,56 @@ class TestBrokerProxy extends Logging { val tp2 = new TopicAndPartition("Redbird", 2013) var fetchTp1 = true // control whether fetching tp1 messages or not + @Test def brokerProxyRetrievesMessagesCorrectly() = { + val (bp, tp, sink) = getMockBrokerProxy() + + bp.start + bp.addTopicPartition(tp, Option("0")) + // Add tp2, which should never receive messages since sink disables it. + bp.addTopicPartition(tp2, Option("0")) + Thread.sleep(1000) + assertEquals(2, sink.receivedMessages.size) + assertEquals(42, sink.receivedMessages.get(0)._2.offset) + assertEquals(84, sink.receivedMessages.get(1)._2.offset) + } + + @Test def brokerProxySkipsFetchForEmptyRequests() = { + val (bp, tp, sink) = getMockBrokerProxy() + + bp.start + // Only add tp2, which should never receive messages since sink disables it. + bp.addTopicPartition(tp2, Option("0")) + Thread.sleep(1000) + assertEquals(0, sink.receivedMessages.size) + assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0) + assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount) + } + + @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { + val (bp, tp, _) = getMockBrokerProxy() + bp.start + bp.addTopicPartition(tp, Option("0")) + + try { + bp.addTopicPartition(tp, Option("1")) + fail("Should have thrown an exception") + } catch { + case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") + case other: Exception => fail("Got some other exception than what we were expecting: " + other) + } + } + def getMockBrokerProxy() = { val sink = new MessageSink { val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]() + def abdicate(tp: TopicAndPartition, nextOffset: Long) {} def refreshDropped() {} - def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) } + def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { + receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) + } def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { } @@ -82,8 +120,10 @@ class TestBrokerProxy extends Logging { sink, offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) { - override val sleepMSWhileNoTopicPartitions = 100 // Speed up for test + override val sleepMSWhileNoTopicPartitions = 100 + // Speed up for test var alreadyCreatedConsumer = false + // Scala traits and Mockito mocks don't mix, unfortunately. override def createSimpleConsumer() = { if (alreadyCreatedConsumer) { @@ -139,8 +179,8 @@ class TestBrokerProxy extends Logging { override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request) override def fetch(request: FetchRequest): FetchResponse = { - // Verify that we only get fetch requests for one tp, even though - // two were registered. This is to verify that + // Verify that we only get fetch requests for one tp, even though + // two were registered. This is to verify that // sink.needsMoreMessages works. assertEquals(1, request.requestInfo.size) sc.fetch(request) @@ -163,45 +203,6 @@ class TestBrokerProxy extends Logging { (bp, tp, sink) } - @Test def brokerProxyRetrievesMessagesCorrectly() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - bp.addTopicPartition(tp, Option("0")) - // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(2, sink.receivedMessages.size) - assertEquals(42, sink.receivedMessages.get(0)._2.offset) - assertEquals(84, sink.receivedMessages.get(1)._2.offset) - } - - @Test def brokerProxySkipsFetchForEmptyRequests() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - // Only add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(0, sink.receivedMessages.size) - assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0) - assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount) - } - - @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { - val (bp, tp, _) = getMockBrokerProxy() - bp.start - bp.addTopicPartition(tp, Option("0")) - - try { - bp.addTopicPartition(tp, Option("1")) - fail("Should have thrown an exception") - } catch { - case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") - case other: Exception => fail("Got some other exception than what we were expecting: " + other) - } - } - @Test def brokerProxyUpdateLatencyMetrics() = { val (bp, tp, _) = getMockBrokerProxy() @@ -221,10 +222,10 @@ class TestBrokerProxy extends Logging { fetchTp1 = true } - @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = { + @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = { // Need to wait for the thread to do some work before ending the test val countdownLatch = new CountDownLatch(1) - var failString:String = null + var failString: String = null val mockMessageSink = mock(classOf[MessageSink]) when(mockMessageSink.needsMoreMessages(any())).thenReturn(true) @@ -243,13 +244,14 @@ class TestBrokerProxy extends Logging { // Create an answer that first indicates offset out of range on first invocation and on second // verifies that the parameters have been updated to what we expect them to be - val answer = new Answer[FetchResponse](){ + val answer = new Answer[FetchResponse]() { var invocationCount = 0 + def answer(invocation: InvocationOnMock): FetchResponse = { val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)] - if(invocationCount == 0) { - if(arguments != (tp, 0)) { + if (invocationCount == 0) { + if (arguments !=(tp, 0)) { failString = "First invocation did not have the right arguments: " + arguments countdownLatch.countDown() } @@ -266,7 +268,7 @@ class TestBrokerProxy extends Logging { invocationCount += 1 mfr } else { - if(arguments != (tp, 1492)) { + if (arguments !=(tp, 1492)) { failString = "On second invocation, arguments were not correct: " + arguments } countdownLatch.countDown() @@ -275,7 +277,7 @@ class TestBrokerProxy extends Logging { } } } - + when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer) // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset @@ -283,7 +285,7 @@ class TestBrokerProxy extends Logging { val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { override def createSimpleConsumer() = { - if(callsToCreateSimpleConsumer > 1) { + if (callsToCreateSimpleConsumer > 1) { failString = "Tried to create more than one simple consumer" countdownLatch.countDown() } @@ -296,31 +298,35 @@ class TestBrokerProxy extends Logging { bp.start countdownLatch.await() bp.stop - if(failString != null) { + if (failString != null) { fail(failString) } } /** - * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions + * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions * that it owns when a consumer failure occurs. */ - @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = { + @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = { val countdownLatch = new CountDownLatch(1) var abdicated: Option[TopicAndPartition] = None @volatile var refreshDroppedCount = 0 val mockMessageSink = new MessageSink { override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { } + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { } + override def abdicate(tp: TopicAndPartition, nextOffset: Long) { abdicated = Some(tp) countdownLatch.countDown } + override def refreshDropped() { refreshDroppedCount += 1 } + override def needsMoreMessages(tp: TopicAndPartition): Boolean = { true } @@ -358,4 +364,57 @@ class TestBrokerProxy extends Logging { bp.stop assertEquals(tp, abdicated.getOrElse(null)) } + + @Test def brokerProxyAbdicatesHardErrors(): Unit = { + val doNothingMetrics = new KafkaSystemConsumerMetrics + val mockMessageSink = new MessageSink { + override def needsMoreMessages(tp: TopicAndPartition): Boolean = true + override def abdicate(tp: TopicAndPartition, nextOffset: Long) {} + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {} + override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")} + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} + } + val mockOffsetGetter = mock(classOf[GetOffset]) + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + var caughtError = false + try { + bp.thread.run + } catch { + case e: SamzaException => { + assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.") + info("Received OutOfMemoryError in broker proxy.") + caughtError = true + } + } + assertEquals(true, caughtError) + val mockMessageSink2 = new MessageSink { + override def needsMoreMessages(tp: TopicAndPartition): Boolean = true + override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {} + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {} + override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")} + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} + } + caughtError = false + val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + try { + bp2.thread.run + } catch { + case e: SamzaException => { + assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.") + info("Received StackOverflowError in broker proxy.") + caughtError = true + } + } + assertEquals(true, caughtError) + } }
