Repository: incubator-samza Updated Branches: refs/heads/master 38e828832 -> 048ffd2fe
SAMZA-16; fix java 7 test incompatibility Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/048ffd2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/048ffd2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/048ffd2f Branch: refs/heads/master Commit: 048ffd2fee118b5d15ca9fd962477ee8a1dc67c8 Parents: 38e8288 Author: Martin Kleppmann <[email protected]> Authored: Mon Mar 10 09:31:19 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Mar 10 09:31:19 2014 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/TopicMetadataCache.scala | 4 ++ .../system/kafka/TestTopicMetadataCache.scala | 68 +++++++++++--------- 2 files changed, 40 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala index 8a24ce3..8a8834f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala @@ -65,4 +65,8 @@ object TopicMetadataCache extends Logging { }.toMap } } + + def clear { + topicMetadataMap.clear + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/048ffd2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala index 3c9c8d6..9ddcb71 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala @@ -18,6 +18,9 @@ */ package org.apache.samza.system.kafka + +import org.junit.Assert._ +import org.junit.Before import org.junit.Test import kafka.api.TopicMetadata import org.apache.samza.util.TopicMetadataStore @@ -28,7 +31,7 @@ import org.apache.samza.util.Clock class TestTopicMetadataCache { - object MockTime extends Clock { + class MockTime extends Clock { var currentValue = 0 def currentTimeMillis: Long = currentValue @@ -46,66 +49,67 @@ class TestTopicMetadataCache { numberOfCalls.getAndIncrement topicMetadata } - /* - def onTopicMissingMock(topic: String, zkClient: ZkClient) { - mockCache += topic -> new TopicMetadata(topic, List.empty, 0) - } - */ def setErrorCode(topic: String, errorCode: Short) { mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode) } - def resetNumOfCalls = numberOfCalls = new AtomicInteger(0) } - val mockStore = new MockTopicMetadataStore() - val waitForThreadStart = new CountDownLatch(3) + @Before def setup { + TopicMetadataCache.clear + } @Test def testBasicMetadataCacheFunctionality { + val mockStore = new MockTopicMetadataStore + val mockTime = new MockTime + // Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data mockStore.setErrorCode("topic1", 3) - var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis) - assert(metadata("topic1").topic.equals("topic1")) - assert(metadata("topic1").errorCode == 3) - assert(mockStore.numberOfCalls.get() == 1) + var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(3, metadata("topic1").errorCode) + assertEquals(1, mockStore.numberOfCalls.get()) // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache - MockTime.currentValue = 5 + mockTime.currentValue = 5 mockStore.setErrorCode("topic1", 0) - metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis) - assert(metadata("topic1").topic.equals("topic1")) - assert(metadata("topic1").errorCode == 0) - assert(mockStore.numberOfCalls.get() == 2) + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").errorCode) + assertEquals(2, mockStore.numberOfCalls.get()) // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not // called - metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis) - assert(metadata("topic1").topic.equals("topic1")) - assert(metadata("topic1").errorCode == 0) - assert(mockStore.numberOfCalls.get() == 2) + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").errorCode) + assertEquals(2, mockStore.numberOfCalls.get()) // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called - MockTime.currentValue = 11 - metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis) - assert(metadata("topic1").topic.equals("topic1")) - assert(metadata("topic1").errorCode == 0) - assert(mockStore.numberOfCalls.get() == 3) + mockTime.currentValue = 11 + metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) + assertEquals("topic1", metadata("topic1").topic) + assertEquals(0, metadata("topic1").errorCode) + assertEquals(3, mockStore.numberOfCalls.get()) } @Test def testMultiThreadedInteractionForTopicMetadataCache { - mockStore.resetNumOfCalls - MockTime.currentValue = 17 + val mockStore = new MockTopicMetadataStore + val mockTime = new MockTime + val waitForThreadStart = new CountDownLatch(3) val numAssertionSuccess = new AtomicBoolean(true) // Add topic to the cache from multiple threads and ensure the store is called only once val threads = new Array[Thread](3) + + mockTime.currentValue = 17 for (i <- 0 until 3) { threads(i) = new Thread(new Runnable { def run { waitForThreadStart.countDown() waitForThreadStart.await() - val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, () => MockTime.currentTimeMillis) + val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1")) numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0) } @@ -115,7 +119,7 @@ class TestTopicMetadataCache { for (i <- 0 until 3) { threads(i).join } - assert(numAssertionSuccess.get() == true) - assert(mockStore.numberOfCalls.get() == 1) + assertTrue(numAssertionSuccess.get()) + assertEquals(1, mockStore.numberOfCalls.get()) } }
