Repository: samza Updated Branches: refs/heads/1.0.0 55b3e9665 -> bdae04b09
Tests for deprecated Kafka Consumer Tested with running with system set to org.apache.samza.system.kafka_deprecated.KafkaSystemFactory. Author: Boris S <[email protected]> Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Cameron Lee <[email protected]> Closes #755 from sborya/OldKafkaConsumer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/85d19bb1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/85d19bb1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/85d19bb1 Branch: refs/heads/1.0.0 Commit: 85d19bb1b5e5329d7917668d8a4a87a544123344 Parents: 55b3e96 Author: Boris S <[email protected]> Authored: Tue Oct 23 18:11:43 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 29 13:34:28 2018 -0700 ---------------------------------------------------------------------- .../kafka_deprecated/TestBrokerProxy.scala | 434 +++++++++++++ .../system/kafka_deprecated/TestGetOffset.scala | 110 ++++ .../kafka_deprecated/TestKafkaSystemAdmin.scala | 351 +++++++++++ .../TestKafkaSystemConsumer.scala | 191 ++++++ .../TestKafkaSystemFactory.scala | 98 +++ .../TestKafkaSystemProducer.scala | 604 +++++++++++++++++++ .../TestTopicMetadataCache.scala | 139 +++++ 7 files changed, 1927 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala new file mode 100644 index 0000000..702e674 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala @@ -0,0 +1,434 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.samza.system.kafka_deprecated + +import java.nio.ByteBuffer +import java.util.concurrent.CountDownLatch + +import kafka.api.{PartitionOffsetsResponse, _} +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet} +import org.apache.kafka.common.protocol.Errors +import org.apache.samza.SamzaException +import org.apache.samza.util.Logging +import org.junit.Assert._ +import org.junit._ +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.{Matchers, Mockito} + +import scala.collection.JavaConverters._ + +class TestBrokerProxy extends Logging { + val tp2 = new TopicAndPartition("Redbird", 2013) + var fetchTp1 = true // control whether fetching tp1 messages or not + + @Test def brokerProxyRetrievesMessagesCorrectly() = { + val (bp, tp, sink) = getMockBrokerProxy() + + bp.start + bp.addTopicPartition(tp, Option("0")) + // Add tp2, which should never receive messages since sink disables it. + bp.addTopicPartition(tp2, Option("0")) + Thread.sleep(1000) + assertEquals(2, sink.receivedMessages.size) + assertEquals(42, sink.receivedMessages(0)._2.offset) + assertEquals(84, sink.receivedMessages(1)._2.offset) + } + + @Test def brokerProxySkipsFetchForEmptyRequests() = { + val (bp, tp, sink) = getMockBrokerProxy() + + bp.start + // Only add tp2, which should never receive messages since sink disables it. + bp.addTopicPartition(tp2, Option("0")) + Thread.sleep(1000) + assertEquals(0, sink.receivedMessages.size) + assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0) + assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount) + } + + @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { + val (bp, tp, _) = getMockBrokerProxy() + bp.start + bp.addTopicPartition(tp, Option("0")) + + try { + bp.addTopicPartition(tp, Option("1")) + fail("Should have thrown an exception") + } catch { + case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") + case other: Exception => fail("Got some other exception than what we were expecting: " + other) + } + } + + def getMockBrokerProxy() = { + val sink = new MessageSink { + val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]() + + def abdicate(tp: TopicAndPartition, nextOffset: Long) {} + + def refreshDropped() {} + + def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { + receivedMessages += ((tp, msg, msg.offset.equals(highWatermark))) + } + + def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { + } + + // Never need messages for tp2. + def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1 + } + + val system = "daSystem" + val host = "host" + val port = 2222 + val tp = new TopicAndPartition("Redbird", 2012) + val metrics = new KafkaSystemConsumerMetrics(system) + + metrics.registerBrokerProxy(host, port) + metrics.registerTopicAndPartition(tp) + metrics.topicPartitions.get((host, port)).set(1) + + val bp = new BrokerProxy( + host, + port, + system, + "daClientId", + metrics, + sink, + offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) { + + override val sleepMSWhileNoTopicPartitions = 100 + // Speed up for test + var alreadyCreatedConsumer = false + + // Scala traits and Mockito mocks don't mix, unfortunately. + override def createSimpleConsumer() = { + if (alreadyCreatedConsumer) { + System.err.println("Should only be creating one consumer in this test!") + throw new InterruptedException("Should only be creating one consumer in this test!") + } + alreadyCreatedConsumer = true + + new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) { + val sc = Mockito.mock(classOf[SimpleConsumer]) + val mockOffsetResponse = { + val offsetResponse = Mockito.mock(classOf[OffsetResponse]) + val partitionOffsetResponse = { + val por = Mockito.mock(classOf[PartitionOffsetsResponse]) + when(por.offsets).thenReturn(List(1l).toSeq) + por + } + + val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse) + when(offsetResponse.partitionErrorAndOffsets).thenReturn(map) + offsetResponse + } + + when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse) + + val fetchResponse = { + val fetchResponse = Mockito.mock(classOf[FetchResponse]) + + val messageSet = { + val messageSet = Mockito.mock(classOf[ByteBufferMessageSet]) + + def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) + val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84)) + + when(messageSet.sizeInBytes).thenReturn(43) + when(messageSet.size).thenReturn(44) + when(messageSet.iterator).thenReturn(messages.iterator) + when(messageSet.head).thenReturn(messages.head) + messageSet + } + + val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet) + val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData) + + when(fetchResponse.data).thenReturn(map.toSeq) + when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) + fetchResponse + } + when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse) + + override def close() = sc.close() + + override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request) + + override def fetch(request: FetchRequest): FetchResponse = { + // Verify that we only get fetch requests for one tp, even though + // two were registered. This is to verify that + // sink.needsMoreMessages works. + assertEquals(1, request.requestInfo.size) + sc.fetch(request) + } + + when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100) + + override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request) + + override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request) + + override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request) + + override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId) + } + } + + } + + (bp, tp, sink) + } + + @Test def brokerProxyUpdateLatencyMetrics() = { + val (bp, tp, _) = getMockBrokerProxy() + + bp.start + bp.addTopicPartition(tp, Option("0")) + Thread.sleep(1000) + // update when fetching messages + assertEquals(500, bp.metrics.highWatermark.get(tp).getValue) + assertEquals(415, bp.metrics.lag.get(tp).getValue) + + fetchTp1 = false + Thread.sleep(1000) + // update when not fetching messages + assertEquals(100, bp.metrics.highWatermark.get(tp).getValue) + assertEquals(15, bp.metrics.lag.get(tp).getValue) + + fetchTp1 = true + } + + @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = { + // Need to wait for the thread to do some work before ending the test + val countdownLatch = new CountDownLatch(1) + var failString: String = null + + val mockMessageSink = mock(classOf[MessageSink]) + when(mockMessageSink.needsMoreMessages(any())).thenReturn(true) + + val doNothingMetrics = new KafkaSystemConsumerMetrics() + + val tp = new TopicAndPartition("topic", 42) + + val mockOffsetGetter = mock(classOf[GetOffset]) + // This will be used by the simple consumer below, and this is the response that simple consumer needs + when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) + when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) + + var callsToCreateSimpleConsumer = 0 + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + // Create an answer that first indicates offset out of range on first invocation and on second + // verifies that the parameters have been updated to what we expect them to be + val answer = new Answer[FetchResponse]() { + var invocationCount = 0 + + def answer(invocation: InvocationOnMock): FetchResponse = { + val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)] + + if (invocationCount == 0) { + if (arguments !=(tp, 0)) { + failString = "First invocation did not have the right arguments: " + arguments + countdownLatch.countDown() + } + val mfr = mock(classOf[FetchResponse]) + when(mfr.hasError).thenReturn(true) + when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE) + + val messageSet = mock(classOf[MessageSet]) + when(messageSet.iterator).thenReturn(Iterator.empty) + val response = mock(classOf[FetchResponsePartitionData]) + when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE) + val responseMap = Map(tp -> response) + when(mfr.data).thenReturn(responseMap.toSeq) + invocationCount += 1 + mfr + } else { + if (arguments !=(tp, 1492)) { + failString = "On second invocation, arguments were not correct: " + arguments + } + countdownLatch.countDown() + Thread.currentThread().interrupt() + null + } + } + } + + when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer) + + // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset + + val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + + override def createSimpleConsumer() = { + if (callsToCreateSimpleConsumer > 1) { + failString = "Tried to create more than one simple consumer" + countdownLatch.countDown() + } + callsToCreateSimpleConsumer += 1 + mockSimpleConsumer + } + } + + bp.addTopicPartition(tp, Option("0")) + bp.start + countdownLatch.await() + bp.stop + if (failString != null) { + fail(failString) + } + } + + /** + * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions + * that it owns when a consumer failure occurs. + */ + @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = { + val countdownLatch = new CountDownLatch(1) + var abdicated: Option[TopicAndPartition] = None + @volatile var refreshDroppedCount = 0 + val mockMessageSink = new MessageSink { + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { + } + + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { + } + + override def abdicate(tp: TopicAndPartition, nextOffset: Long) { + abdicated = Some(tp) + countdownLatch.countDown + } + + override def refreshDropped() { + refreshDroppedCount += 1 + } + + override def needsMoreMessages(tp: TopicAndPartition): Boolean = { + true + } + } + + val doNothingMetrics = new KafkaSystemConsumerMetrics() + val tp = new TopicAndPartition("topic", 42) + val mockOffsetGetter = mock(classOf[GetOffset]) + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) + when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) + when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that.")) + + val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + + val waitForRefresh = () => { + val currentRefreshDroppedCount = refreshDroppedCount + while (refreshDroppedCount == currentRefreshDroppedCount) { + Thread.sleep(100) + } + } + + bp.addTopicPartition(tp, Option("0")) + bp.start + // BP should refresh on startup. + waitForRefresh() + countdownLatch.await() + // BP should continue refreshing after it's abdicated all TopicAndPartitions. + waitForRefresh() + bp.stop + assertEquals(tp, abdicated.getOrElse(null)) + } + + @Test def brokerProxyAbdicatesHardErrors(): Unit = { + val doNothingMetrics = new KafkaSystemConsumerMetrics + val mockMessageSink = new MessageSink { + override def needsMoreMessages(tp: TopicAndPartition): Boolean = true + override def abdicate(tp: TopicAndPartition, nextOffset: Long) {} + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {} + override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")} + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} + } + val mockOffsetGetter = mock(classOf[GetOffset]) + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + var caughtError = false + try { + bp.thread.run + } catch { + case e: SamzaException => { + assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.") + info("Received OutOfMemoryError in broker proxy.") + caughtError = true + } + } + assertEquals(true, caughtError) + val mockMessageSink2 = new MessageSink { + override def needsMoreMessages(tp: TopicAndPartition): Boolean = true + override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {} + override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {} + override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")} + override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} + } + caughtError = false + val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + try { + bp2.thread.run + } catch { + case e: SamzaException => { + assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.") + info("Received StackOverflowError in broker proxy.") + caughtError = true + } + } + assertEquals(true, caughtError) + } + + @Test + def brokerProxyStopCloseConsumer: Unit = { + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){ + override def createSimpleConsumer() = { + mockSimpleConsumer + } + } + bp.start + bp.stop + verify(mockSimpleConsumer).close + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala new file mode 100644 index 0000000..21ee4cd --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka_deprecated + +import java.nio.ByteBuffer + +import kafka.api._ +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.Message +import kafka.message.ByteBufferMessageSet +import org.apache.kafka.common.errors.OffsetOutOfRangeException +import org.junit._ +import org.junit.Assert._ +import org.mockito.Mockito +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer + +class TestGetOffset { + + private val outOfRangeOffset : String = "0" + + /** + * An empty message set is still a valid offset. It just means that the + * offset was for the upcoming message, which hasn't yet been written. The + * fetch request times out in such a case, and an empty message set is + * returned. + */ + @Test + def testIsValidOffsetWorksWithEmptyMessageSet { + val getOffset = new GetOffset(OffsetRequest.LargestTimeString) + // Should not throw an exception. + assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234")) + } + + /** + * An empty message set is still a valid offset. It just means that the + * offset was for the upcoming message, which hasn't yet been written. The + * fetch request times out in such a case, and an empty message set is + * returned. + */ + @Test + def testIsValidOffsetWorksWithOffsetOutOfRangeException { + val getOffset = new GetOffset(OffsetRequest.LargestTimeString) + // Should not throw an exception. + assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset)) + } + + /** + * Create a default fetch simple consumer that returns empty message sets. + */ + def getMockDefaultFetchSimpleConsumer = { + new DefaultFetchSimpleConsumer("", 0, 0, 0, "") { + val sc = Mockito.mock(classOf[SimpleConsumer]) + + // Build an empty fetch response. + val fetchResponse = { + val fetchResponse = Mockito.mock(classOf[FetchResponse]) + val messageSet = { + val messageSet = Mockito.mock(classOf[ByteBufferMessageSet]) + val messages = List() + + def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) + + when(messageSet.sizeInBytes).thenReturn(0) + when(messageSet.size).thenReturn(0) + when(messageSet.iterator).thenReturn(messages.iterator) + + messageSet + } + when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) + + fetchResponse + } + + doAnswer(new Answer[FetchResponse] { + override def answer(invocation: InvocationOnMock): FetchResponse = { + if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists( + req => req._2.offset.toString.equals(outOfRangeOffset))) { + throw new OffsetOutOfRangeException("test exception") + } + fetchResponse + } + }).when(sc).fetch(any(classOf[FetchRequest])) + + override def fetch(request: FetchRequest): FetchResponse = { + sc.fetch(request) + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala new file mode 100644 index 0000000..bf64c03 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala @@ -0,0 +1,351 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka_deprecated + +import java.util.{Properties, UUID} + +import kafka.admin.AdminUtils +import org.apache.kafka.common.errors.LeaderNotAvailableException +import org.apache.kafka.common.protocol.Errors +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, ZkUtils} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.security.JaasUtils +import org.apache.samza.Partition +import org.apache.samza.config.KafkaProducerConfig +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore} +import org.junit.Assert._ +import org.junit._ + +import scala.collection.JavaConverters._ + +/** + * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava + */ +object TestKafkaSystemAdmin extends KafkaServerTestHarness { + + val SYSTEM = "kafka" + val TOPIC = "input" + val TOPIC2 = "input2" + val TOTAL_PARTITIONS = 50 + val REPLICATION_FACTOR = 2 + val zkSecure = JaasUtils.isZkSecurityEnabled() + + protected def numBrokers: Int = 3 + + var producer: KafkaProducer[Array[Byte], Array[Byte]] = null + var metadataStore: TopicMetadataStore = null + var producerConfig: KafkaProducerConfig = null + var systemAdmin: KafkaSystemAdmin = null + + override def generateConfigs(): Seq[KafkaConfig] = { + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true) + props.map(KafkaConfig.fromProps) + } + + @BeforeClass + override def setUp() { + super.setUp() + val config = new java.util.HashMap[String, String]() + config.put("bootstrap.servers", brokerList) + config.put("acks", "all") + config.put("serializer.class", "kafka.serializer.StringEncoder") + producerConfig = new KafkaProducerConfig("kafka", "i001", config) + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) + metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name") + systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) + systemAdmin.start() + } + + @AfterClass + override def tearDown() { + systemAdmin.stop() + producer.close() + super.tearDown() + } + + def createTopic(topicName: String, partitionCount: Int) { + AdminUtils.createTopic( + zkUtils, + topicName, + partitionCount, + REPLICATION_FACTOR) + } + + def validateTopic(topic: String, expectedPartitionCount: Int) { + var done = false + var retries = 0 + val maxRetries = 100 + + while (!done && retries < maxRetries) { + try { + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo) + val topicMetadata = topicMetadataMap(topic) + + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) + + done = expectedPartitionCount == topicMetadata.partitionsMetadata.size + } catch { + case e: Exception => + System.err.println("Got exception while validating test topics. Waiting and retrying.", e) + retries += 1 + Thread.sleep(500) + } + } + + if (retries >= maxRetries) { + fail("Unable to successfully create topics. Tried to validate %s times." format retries) + } + } + + def getConsumerConnector(): ConsumerConnector = { + val props = new Properties + + props.put("zookeeper.connect", zkConnect) + props.put("group.id", "test") + props.put("auto.offset.reset", "smallest") + + val consumerConfig = new ConsumerConfig(props) + Consumer.create(consumerConfig) + } + + def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = { + new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, + coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false) + } + +} + +/** + * Test creates a local ZK and Kafka cluster, and uses it to create and test + * topics for to verify that offset APIs in SystemAdmin work as expected. + */ +class TestKafkaSystemAdmin { + import TestKafkaSystemAdmin._ + + @Test + def testShouldAssembleMetadata { + val oldestOffsets = Map( + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "o1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "o2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "o3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "o4") + val newestOffsets = Map( + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "n1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "n2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "n3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "n4") + val upcomingOffsets = Map( + new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "u1", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2", + new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3", + new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4") + val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) + assertNotNull(metadata) + assertEquals(2, metadata.size) + assertTrue(metadata.contains("stream1")) + assertTrue(metadata.contains("stream2")) + val stream1Metadata = metadata("stream1") + val stream2Metadata = metadata("stream2") + assertNotNull(stream1Metadata) + assertNotNull(stream2Metadata) + assertEquals("stream1", stream1Metadata.getStreamName) + assertEquals("stream2", stream2Metadata.getStreamName) + val expectedSystemStream1Partition0Metadata = new SystemStreamPartitionMetadata("o1", "n1", "u1") + val expectedSystemStream1Partition1Metadata = new SystemStreamPartitionMetadata("o3", "n3", "u3") + val expectedSystemStream2Partition0Metadata = new SystemStreamPartitionMetadata("o2", "n2", "u2") + val expectedSystemStream2Partition1Metadata = new SystemStreamPartitionMetadata("o4", "n4", "u4") + val stream1PartitionMetadata = stream1Metadata.getSystemStreamPartitionMetadata + val stream2PartitionMetadata = stream2Metadata.getSystemStreamPartitionMetadata + assertEquals(expectedSystemStream1Partition0Metadata, stream1PartitionMetadata.get(new Partition(0))) + assertEquals(expectedSystemStream1Partition1Metadata, stream1PartitionMetadata.get(new Partition(1))) + assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0))) + assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1))) + } + + @Test + def testShouldGetOldestNewestAndNextOffsets { + // Create an empty topic with 50 partitions, but with no offsets. + createTopic(TOPIC, 50) + validateTopic(TOPIC, 50) + + // Verify the empty topic behaves as expected. + var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) + assertEquals(1, metadata.size) + assertNotNull(metadata.get(TOPIC)) + // Verify partition count. + var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata + assertEquals(50, sspMetadata.size) + // Empty topics should have null for latest offset and 0 for earliest offset + assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset) + assertNull(sspMetadata.get(new Partition(0)).getNewestOffset) + // Empty Kafka topics should have a next offset of 0. + assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset) + + // Add a new message to one of the partitions, and verify that it works as + // expected. + producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get() + metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) + assertEquals(1, metadata.size) + val streamName = metadata.keySet.asScala.head + assertEquals(TOPIC, streamName) + sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata + // key1 gets hash-mod'd to partition 48. + assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset) + assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset) + assertEquals("1", sspMetadata.get(new Partition(48)).getUpcomingOffset) + // Some other partition should be empty. + assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset) + assertNull(sspMetadata.get(new Partition(3)).getNewestOffset) + assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset) + + // Add a second message to one of the same partition. + producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get() + metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) + assertEquals(1, metadata.size) + assertEquals(TOPIC, streamName) + sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata + // key1 gets hash-mod'd to partition 48. + assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset) + assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset) + assertEquals("2", sspMetadata.get(new Partition(48)).getUpcomingOffset) + + // Validate that a fetch will return the message. + val connector = getConsumerConnector + var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator + var message = stream.next + var text = new String(message.message, "UTF-8") + connector.shutdown + // First message should match the earliest expected offset. + assertEquals(sspMetadata.get(new Partition(48)).getOldestOffset, message.offset.toString) + assertEquals("val1", text) + // Second message should match the earliest expected offset. + message = stream.next + text = new String(message.message, "UTF-8") + assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString) + assertEquals("val2", text) + } + + @Test + def testNonExistentTopic { + val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava) + val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata")) + assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( + new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava)) + } + + @Test + def testOffsetsAfter { + val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) + val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) + val offsetsAfter = systemAdmin.getOffsetsAfter(Map( + ssp1 -> "1", + ssp2 -> "2").asJava) + assertEquals("2", offsetsAfter.get(ssp1)) + assertEquals("3", offsetsAfter.get(ssp2)) + } + + @Test + def testShouldCreateCoordinatorStream { + val topic = "test-coordinator-stream" + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) + + val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") + systemAdmin.createStream(spec) + validateTopic(topic, 1) + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) + assertTrue(topicMetadataMap.contains(topic)) + val topicMetadata = topicMetadataMap(topic) + val partitionMetadata = topicMetadata.partitionsMetadata.head + assertEquals(0, partitionMetadata.partitionId) + assertEquals(3, partitionMetadata.replicas.size) + } + + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) { + import kafka.api.TopicMetadata + var metadataCallCount = 0 + + // Simulate Kafka telling us that the leader for the topic is not available + override def getTopicMetadata(topics: Set[String]) = { + metadataCallCount += 1 + val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE) + Map("quux" -> topicMetadata) + } + } + + @Test + def testShouldRetryOnTopicMetadataError { + val systemAdmin = new KafkaSystemAdminWithTopicMetadataError + val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3) + try { + systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff) + fail("expected CallLimitReached to be thrown") + } catch { + case e: ExponentialSleepStrategy.CallLimitReached => () + } + } + + @Test + def testGetNewestOffset { + createTopic(TOPIC2, 16) + validateTopic(TOPIC2, 16) + + val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4)) + val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13)) + + assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Add a new message to one of the partitions, and verify that it works as expected. + assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString) + assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Again + assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString) + assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3)) + assertNull(systemAdmin.getNewestOffset(otherSsp, 3)) + + // Add a message to both partitions + assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString) + assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString) + assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0)) + assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0)) + } + + @Test (expected = classOf[LeaderNotAvailableException]) + def testGetNewestOffsetMaxRetry { + val expectedRetryCount = 3 + val systemAdmin = new KafkaSystemAdminWithTopicMetadataError + try { + systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3) + } catch { + case e: Exception => + assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount) + throw e + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala new file mode 100644 index 0000000..ac37619 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka_deprecated + +import kafka.api.TopicMetadata +import kafka.api.PartitionMetadata +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.message.Message +import kafka.message.MessageAndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.apache.samza.util.TopicMetadataStore +import org.junit.Test +import org.junit.Assert._ +import org.apache.samza.system.SystemAdmin +import org.mockito.Mockito._ +import org.mockito.Matchers._ + +class TestKafkaSystemConsumer { + val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin]) + private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0)) + private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null) + private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100) + private val clientId = "TestClientId" + + @Test + def testFetchThresholdShouldDivideEvenlyAmongPartitions { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) { + override def refreshBrokers { + } + } + + for (i <- 0 until 50) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + assertEquals(1000, consumer.perPartitionFetchThreshold) + } + + @Test + def testBrokerCreationShouldTriggerStart { + val systemName = "test-system" + val streamName = "test-stream" + val metrics = new KafkaSystemConsumerMetrics + // Lie and tell the store that the partition metadata is empty. We can't + // use partition metadata because it has Broker in its constructor, which + // is package private to Kafka. + val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE))) + var hosts = List[String]() + var getHostPortCount = 0 + val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) { + override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { + // Generate a unique host every time getHostPort is called. + getHostPortCount += 1 + Some("localhost-%s" format getHostPortCount, 0) + } + + override def createBrokerProxy(host: String, port: Int): BrokerProxy = { + new BrokerProxy(host, port, systemName, "", metrics, sink) { + override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { + // Skip this since we normally do verification of offsets, which + // tries to connect to Kafka. Rather than mock that, just forget it. + nextOffsets.size + } + + override def start { + hosts :+= host + } + } + } + } + + consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1") + assertEquals(0, hosts.size) + consumer.start + assertEquals(List("localhost-1"), hosts) + // Should trigger a refresh with a new host. + consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2) + assertEquals(List("localhost-1", "localhost-2"), hosts) + } + + @Test + def testConsumerRegisterOlderOffsetOfTheSamzaSSP { + when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod() + + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) + val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) + val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) + val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2)) + + consumer.register(ssp0, "0") + consumer.register(ssp0, "5") + consumer.register(ssp1, "2") + consumer.register(ssp1, "3") + consumer.register(ssp2, "0") + + assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0))) + assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1))) + assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2))) + } + + @Test + def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, + fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + assertEquals(5000, consumer.perPartitionFetchThreshold) + assertEquals(3000, consumer.perPartitionFetchThresholdBytes) + } + + @Test + def testFetchThresholdBytes { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, + fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + val msg = Array[Byte](5, 112, 9, 126) + val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654) + // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead + consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354) + + assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) + } + + @Test + def testFetchThresholdBytesDisabled { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, + fetchThreshold = 50000, fetchThresholdBytes = 60000L) { + override def refreshBrokers { + } + } + + for (i <- 0 until 10) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + consumer.start + + assertEquals(5000, consumer.perPartitionFetchThreshold) + assertEquals(0, consumer.perPartitionFetchThresholdBytes) + assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) + } +} + +class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { + def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala new file mode 100644 index 0000000..41a48f3 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka_deprecated + +import org.apache.samza.SamzaException +import org.apache.samza.config.MapConfig +import org.apache.samza.config.StorageConfig +import org.apache.samza.metrics.MetricsRegistryMap +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ + +class TestKafkaSystemFactory { + @Test + def testFailWhenNoSerdeDefined { + val producerFactory = new KafkaSystemFactory + try { + producerFactory.getProducer( + "test", + new MapConfig(Map[String, String]().asJava), + new MetricsRegistryMap) + fail("Expected to get a Samza exception.") + } catch { + case e: SamzaException => None // expected + case e: Exception => fail("Expected SamzaException, but got " + e) + } + } + + @Test + def testFailWhenSerdeIsInvalid { + val producerFactory = new KafkaSystemFactory + val config = new MapConfig(Map[String, String]( + "streams.test.serde" -> "failme").asJava) + try { + producerFactory.getProducer( + "test", + config, + new MetricsRegistryMap) + fail("Expected to get a Samza exception.") + } catch { + case e: SamzaException => None // expected + case e: Exception => fail("Expected SamzaException, but got " + e) + } + } + + @Test + def testHappyPath { + val producerFactory = new KafkaSystemFactory + val config = new MapConfig(Map[String, String]( + "job.name" -> "test", + "systems.test.producer.bootstrap.servers" -> "", + "systems.test.samza.key.serde" -> "json", + "systems.test.samza.msg.serde" -> "json", + "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory").asJava) + var producer = producerFactory.getProducer( + "test", + config, + new MetricsRegistryMap) + assertNotNull(producer) + assertTrue(producer.isInstanceOf[KafkaSystemProducer]) + producer = producerFactory.getProducer( + "test", + config, + new MetricsRegistryMap) + assertNotNull(producer) + assertTrue(producer.isInstanceOf[KafkaSystemProducer]) + } + + @Test + def testInjectedProducerProps { + val configMap = Map[String, String]( + StorageConfig.FACTORY.format("system1") -> "some.factory.Class", + StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1", + StorageConfig.FACTORY.format("system2") -> "some.factory.Class") + val config = new MapConfig(configMap.asJava) + assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config)) + assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config)) + assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config)) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala new file mode 100644 index 0000000..16a1287 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka_deprecated + +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException} +import org.apache.kafka.test.MockSerializer +import org.apache.samza.system.kafka.MockKafkaProducer +import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream} +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.Assertions.intercept + + +class TestKafkaSystemProducer { + val systemStream = new SystemStream("testSystem", "testStream") + val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes) + + @Test + def testKafkaProducer { + val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + systemProducer.register("test") + systemProducer.start + systemProducer.send("test", someMessage) + assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size()) + systemProducer.stop + } + + @Test + def testKafkaProducerUsingMockKafkaProducer { + val mockProducer = new MockKafkaProducer(1, "test", 1) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + systemProducer.register("test") + systemProducer.start() + systemProducer.send("test", someMessage) + assertEquals(1, mockProducer.getMsgsSent) + systemProducer.stop() + } + + @Test + def testKafkaProducerBufferedSend { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producerMetrics = new KafkaSystemProducerMetrics + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = producerMetrics) + systemProducer.register("test") + systemProducer.start() + systemProducer.send("test", msg1) + + mockProducer.setShouldBuffer(true) + systemProducer.send("test", msg2) + systemProducer.send("test", msg3) + assertEquals(1, mockProducer.getMsgsSent) + + val sendThread: Thread = mockProducer.startDelayedSendThread(2000) + sendThread.join() + + assertEquals(3, mockProducer.getMsgsSent) + systemProducer.stop() + } + + @Test + def testKafkaProducerFlushSuccessful { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + systemProducer.register("test") + systemProducer.start() + systemProducer.send("test", msg1) + + mockProducer.setShouldBuffer(true) + systemProducer.send("test", msg2) + systemProducer.send("test", msg3) + assertEquals(1, mockProducer.getMsgsSent) + mockProducer.startDelayedSendThread(2000) + systemProducer.flush("test") + assertEquals(3, mockProducer.getMsgsSent) + systemProducer.stop() + } + + @Test + def testKafkaProducerFlushWithException { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics()) + systemProducer.register("test") + systemProducer.start() + systemProducer.send("test", msg1) + + mockProducer.setShouldBuffer(true) + systemProducer.send("test", msg2) + mockProducer.setErrorNext(true, true, new RecordTooLargeException()) + systemProducer.send("test", msg3) + systemProducer.send("test", msg4) + + assertEquals(1, mockProducer.getMsgsSent) + + mockProducer.startDelayedSendThread(2000) + val thrown = intercept[SystemProducerException] { + systemProducer.flush("test") + } + assertTrue(thrown.isInstanceOf[SystemProducerException]) + assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent + systemProducer.stop() + } + + @Test + def testKafkaProducerWithRetriableException { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producerMetrics = new KafkaSystemProducerMetrics() + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = producerMetrics) + + producer.register("test") + producer.start() + producer.send("test", msg1) + producer.send("test", msg2) + producer.send("test", msg3) + producer.flush("test") + + mockProducer.setErrorNext(true, true, new TimeoutException()) + + producer.send("test", msg4) + val thrown = intercept[SystemProducerException] { + producer.flush("test") + } + assertTrue(thrown.isInstanceOf[SystemProducerException]) + assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException]) + assertEquals(3, mockProducer.getMsgsSent) + producer.stop() + } + + /** + * If there's an exception, we should: + * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. + * 2. Record the original exception + * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails. + * + * Assumptions: + * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit) + * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance + * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container + * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others. + * + * Conclusions: + * It is only safe to handle the async exceptions from by closing the producer and failing the container. + * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data + * loss by checkpointing a failed offset. + * + * Inaccuracies: + * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all + * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary + * conditions where the batches align perfectly around the failed send(). + */ + @Test + def testKafkaProducerWithFatalExceptions { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics) + producer.register("test") + producer.start() + + producer.send("test", msg1) + producer.send("test", msg2) + mockProducer.setErrorNext(true, true, new RecordTooLargeException()) + producer.send("test", msg3) // Callback exception + assertTrue(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + + val senderException = intercept[SystemProducerException] { + producer.send("test", msg4) // Should fail because the producer is closed. + } + assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + val callbackException = intercept[SystemProducerException] { + producer.flush("test") // Should throw the callback exception + } + assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + val postFlushException = intercept[SystemProducerException] { + producer.send("test", msg5) // Should not be able to send again after flush + } + assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + val callbackException2 = intercept[SystemProducerException] { + producer.flush("test") // Should rethrow the exception + } + assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException]) + assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent + producer.stop() + } + + /** + * Recapping from [[testKafkaProducerWithFatalExceptions]]: + * + * If there's an exception, we should: + * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. + * 2. Record the original exception + * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails. + * + * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources + * which share the same producer. + */ + @Test + def testKafkaProducerWithFatalExceptionsMultipleSources { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes) + val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics) + producer.register("test1") + producer.register("test2") + + producer.start() + + // Initial sends + producer.send("test1", msg1) + producer.send("test2", msg2) + + // Inject error for next send + mockProducer.setErrorNext(true, true, new RecordTooLargeException()) + producer.send("test1", msg3) // Callback exception + assertTrue(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + + // Subsequent sends + val senderException = intercept[SystemProducerException] { + producer.send("test1", msg4) // Should fail because the producer is closed. + } + assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + val callbackException = intercept[SystemProducerException] { + producer.send("test2", msg4) // First send from separate source gets a producer closed exception + } + assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + val callbackException2 = intercept[SystemProducerException] { + producer.send("test2", msg5) // Second send should still get the error + } + assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + // Flushes + val callbackException3 = intercept[SystemProducerException] { + producer.flush("test2") // Should rethrow the closed exception in flush + } + assertTrue(callbackException3.isInstanceOf[SystemProducerException]) + assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException]) + intercept[SystemProducerException] { + producer.send("test2", msg6) // Should still not be able to send after flush + } + + val thrown3 = intercept[SystemProducerException] { + producer.flush("test1") // Should throw the callback exception + } + assertTrue(thrown3.isInstanceOf[SystemProducerException]) + assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException]) + + intercept[SystemProducerException] { + producer.send("test1", msg7) // Should still not be able to send after flush + } + + intercept[SystemProducerException] { + producer.flush("test1") // Should throw the callback exception + } + assertEquals(2, mockProducer.getMsgsSent) + producer.stop() + } + + @Test + def testKafkaProducerWithNonFatalExceptionsMultipleSources { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics) + producer.register("test1") + producer.register("test2") + producer.start() + + producer.send("test1", msg1) + producer.send("test2", msg2) + mockProducer.setErrorNext(true, false, new SerializationException()) + val sendException = intercept[SystemProducerException] { + producer.send("test1", msg3) // User-thread exception + } + assertTrue(sendException.getCause.isInstanceOf[SerializationException]) + assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + + producer.send("test1", msg3) // Should be able to resend msg3 + producer.send("test2", msg4) // Second source should not be affected + + producer.flush("test1") // Flush should be unaffected + + producer.send("test1", msg5) // Should be able to send again after flush + + assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent + producer.stop() + } + + /** + * If there's an exception and the user configured task.drop.producer.errors, we should: + * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. + * 2. Recreate the producer. + * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option) + * + * Assumptions: + * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit) + * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance + * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container + * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others. + * + * Conclusions: + * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and + * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric + * should be accurate, so users should alert on that. + * + * Inaccuracies: + * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all + * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary + * conditions where the batches align perfectly around the failed send(). + */ + @Test + def testKafkaProducerWithFatalExceptionsDroppingExceptions { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics, + dropProducerExceptions = true) // Here's where we enable exception dropping. + producer.register("test") + producer.start() + + producer.send("test", msg1) + producer.send("test", msg2) + mockProducer.setErrorNext(true, true, new RecordTooLargeException()) + producer.send("test", msg3) // Callback exception + assertTrue(mockProducer.isClosed) + assertNull(producer.producerRef.get()) + assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) + + producer.send("test", msg4) // Should succeed because the producer recovered. + assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) + assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) + producer.flush("test") // Should not throw + + producer.send("test", msg5) // Should be able to send again after flush + assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) + producer.flush("test") + + assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent + producer.stop() + } + + /** + * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]: + * + * If there's an exception, we should: + * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order. + * 2. Recreate the producer. + * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option) + * + * This test ensures that the failures are handled properly across multiple sources + * which share the same producer. + */ + @Test + def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes) + val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics, + dropProducerExceptions = true) // Here's where we enable exception dropping. + producer.register("test1") + producer.register("test2") + + producer.start() + + // Initial sends + producer.send("test1", msg1) + producer.send("test2", msg2) + + // Inject error for next send + mockProducer.setErrorNext(true, true, new RecordTooLargeException()) + producer.send("test1", msg3) // Callback exception + assertTrue(mockProducer.isClosed) + assertNull(producer.producerRef.get()) + assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) + + // Subsequent sends + producer.send("test1", msg4) // Should succeed because the producer recovered. + assertFalse(mockProducer.isClosed) + assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) + assertNotNull(producer.producerRef.get()) + producer.send("test2", msg5) // Second source should also not have any error. + assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) + + // Flushes + producer.flush("test2") // Should not throw for test2 + producer.send("test2", msg6) // Should still work after flush + + producer.flush("test1") // Should not throw for test1 either + producer.send("test1", msg7) + + assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent + producer.stop() + } + + @Test + def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes) + val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes) + val producerMetrics = new KafkaSystemProducerMetrics() + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val producer = new KafkaSystemProducer(systemName = "test", + getProducer = () => { + mockProducer.open() // A new producer would not already be closed, so reset it. + mockProducer + }, + metrics = producerMetrics, + dropProducerExceptions = true) // Here's where we enable exception dropping. + producer.register("test1") + producer.register("test2") + producer.start() + + producer.send("test1", msg1) + producer.send("test2", msg2) + mockProducer.setErrorNext(true, false, new SerializationException()) + val sendException = intercept[SystemProducerException] { + producer.send("test1", msg3) // User-thread exception + } + assertTrue(sendException.getCause.isInstanceOf[SerializationException]) + assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + + producer.send("test1", msg3) // Should be able to resend msg3 + assertFalse(mockProducer.isClosed) + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + assertNotNull(producer.producerRef.get()) + producer.send("test2", msg4) // Second source should not be affected + + producer.flush("test1") // Flush should be unaffected + + producer.send("test1", msg5) // Should be able to send again after flush + + assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent + producer.stop() + } + + @Test + def testKafkaProducerFlushMsgsWhenStop { + val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes) + val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes) + val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes) + val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes) + + val mockProducer = new MockKafkaProducer(1, "test", 1) + val systemProducer = new KafkaSystemProducer(systemName = "test", + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + systemProducer.register("test") + systemProducer.register("test2") + systemProducer.start() + systemProducer.send("test", msg1) + + mockProducer.setShouldBuffer(true) + systemProducer.send("test", msg2) + systemProducer.send("test", msg3) + systemProducer.send("test2", msg4) + assertEquals(1, mockProducer.getMsgsSent) + mockProducer.startDelayedSendThread(2000) + assertEquals(1, mockProducer.getMsgsSent) + systemProducer.stop() + assertEquals(4, mockProducer.getMsgsSent) + } + + @Test + def testSystemStreamNameNullOrEmpty { + val omeStreamNameNull = new OutgoingMessageEnvelope(new SystemStream("test", null), "a".getBytes) + val omeStreamNameEmpty = new OutgoingMessageEnvelope(new SystemStream("test", ""), "a".getBytes) + val mockProducer = new MockKafkaProducer(1, "testMock", 1) + val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + + val thrownNull = intercept[IllegalArgumentException] { + producer.register("test1") + producer.start() + producer.send("testSrc1", omeStreamNameNull) + assertEquals(0, mockProducer.getMsgsSent) + } + val thrownEmpty = intercept[IllegalArgumentException] { + producer.register("test2") + producer.start() + producer.send("testSrc2", omeStreamNameEmpty) + assertEquals(0, mockProducer.getMsgsSent) + } + assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream) + assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/85d19bb1/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala new file mode 100644 index 0000000..dda011b --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka_deprecated + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import kafka.api.TopicMetadata +import org.I0Itec.zkclient.ZkClient +import org.apache.samza.util.Clock +import org.apache.samza.util.TopicMetadataStore +import org.junit.Assert._ +import org.junit.Before +import org.junit.Test +import kafka.common.ErrorMapping +import kafka.api.PartitionMetadata +import org.apache.kafka.common.protocol.Errors + +class TestTopicMetadataCache { + + class MockTime extends Clock { + var currentValue = 0 + + def currentTimeMillis: Long = currentValue + } + + class MockTopicMetadataStore extends TopicMetadataStore { + var mockCache = Map( + "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE), + "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE)) + var numberOfCalls: AtomicInteger = new AtomicInteger(0) + + def getTopicInfo(topics: Set[String]) = { + var topicMetadata = Map[String, TopicMetadata]() + topics.foreach(topic => topicMetadata += topic -> mockCache(topic)) + numberOfCalls.getAndIncrement + topicMetadata + } + + def setErrorCode(topic: String, errorCode: Short) { + mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode)) + } + } + + @Before def setup { + TopicMetadataCache.clear + } + + @Test + def testBasicMetadataCacheFunctionality { + val mockStore = new MockTopicMetadataStore + val mockTime = new MockTime + + // Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data + mockStore.setErrorCode("topic1", 3) + var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(3, metadata("topic1").error.code) + assertEquals(1, mockStore.numberOfCalls.get()) + + // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache + mockTime.currentValue = 5 + mockStore.setErrorCode("topic1", 0) + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").error.code) + assertEquals(2, mockStore.numberOfCalls.get()) + + // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not + // called + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").error.code) + assertEquals(2, mockStore.numberOfCalls.get()) + + // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called + mockTime.currentValue = 11 + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").error.code) + assertEquals(3, mockStore.numberOfCalls.get()) + } + + @Test + def testMultiThreadedInteractionForTopicMetadataCache { + val mockStore = new MockTopicMetadataStore + val mockTime = new MockTime + val waitForThreadStart = new CountDownLatch(3) + val numAssertionSuccess = new AtomicBoolean(true) + // Add topic to the cache from multiple threads and ensure the store is called only once + val threads = new Array[Thread](3) + + mockTime.currentValue = 17 + for (i <- 0 until 3) { + threads(i) = new Thread(new Runnable { + def run { + waitForThreadStart.countDown() + waitForThreadStart.await() + val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1")) + numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0) + } + }) + threads(i).start() + } + for (i <- 0 until 3) { + threads(i).join + } + assertTrue(numAssertionSuccess.get()) + assertEquals(1, mockStore.numberOfCalls.get()) + } + + @Test + def testBadErrorCodes { + val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE) + val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT))) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE))) + } +}
