Repository: incubator-samza Updated Branches: refs/heads/master 80bf78204 -> 74232e2f4
SAMZA-233; start new broker proxies when abdicating Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/74232e2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/74232e2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/74232e2f Branch: refs/heads/master Commit: 74232e2f4fd0551451bf4c3acfe9109c61f4e611 Parents: 80bf782 Author: Chris Riccomini <[email protected]> Authored: Tue Apr 15 15:55:24 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Apr 15 15:55:24 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 10 ++-- .../system/kafka/KafkaSystemConsumer.scala | 53 ++++++++++++-------- .../samza/system/kafka/KafkaSystemFactory.scala | 3 ++ .../system/kafka/TestKafkaSystemConsumer.scala | 53 +++++++++++++++++++- 4 files changed, 93 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 88817ef..e08791f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -268,9 +268,13 @@ class BrokerProxy( def start { info("Starting " + toString) - thread.setDaemon(true) - thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) - thread.start + if (!thread.isAlive) { + thread.setDaemon(true) + thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) + thread.start + } else { + debug("Tried to start an already started broker proxy (%s). Ignoring." format toString) + } } def stop { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 511306f..1825fbb 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -19,7 +19,6 @@ package org.apache.samza.system.kafka -import org.apache.samza.util.ClientUtilTopicMetadataStore import kafka.common.TopicAndPartition import grizzled.slf4j.Logging import kafka.message.MessageAndOffset @@ -35,6 +34,10 @@ import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.SamzaException +import org.apache.samza.util.TopicMetadataStore +import org.apache.samza.util.ExponentialSleepStrategy +import kafka.api.TopicMetadata +import org.apache.samza.util.ExponentialSleepStrategy object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -52,6 +55,7 @@ private[kafka] class KafkaSystemConsumer( systemName: String, brokerListString: String, metrics: KafkaSystemConsumerMetrics, + metadataStore: TopicMetadataStore, clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, timeout: Int = ConsumerConfig.ConsumerTimeoutMs, bufferSize: Int = ConsumerConfig.SocketBufferSize, @@ -98,8 +102,6 @@ private[kafka] class KafkaSystemConsumer( } refreshBrokers(topicPartitionsAndOffsets) - - brokerProxies.values.foreach(_.start) } override def register(systemStreamPartition: SystemStreamPartition, offset: String) { @@ -114,36 +116,42 @@ private[kafka] class KafkaSystemConsumer( brokerProxies.values.foreach(_.stop) } + protected def createBrokerProxy(host: String, port: Int): BrokerProxy = { + new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) + } + + protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = { + // Whatever we do, we can't say Broker, even though we're + // manipulating it here. Broker is a private type and Scala doesn't seem + // to care about that as long as you don't explicitly declare its type. + val brokerOption = topicMetadata + .partitionsMetadata + .find(_.partitionId == partition) + .flatMap(_.leader) + + brokerOption match { + case Some(broker) => Some(broker.host, broker.port) + case _ => None + } + } + def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) { var tpToRefresh = topicPartitionsAndOffsets.keySet.toList retryBackoff.run( loop => { - val getTopicMetadata = (topics: Set[String]) => { - new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout).getTopicInfo(topics) - } val topics = tpToRefresh.map(_.topic).toSet - val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, getTopicMetadata) + val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. // This avoids trying to re-add the same topic partition repeatedly def refresh(tp: List[TopicAndPartition]) = { val head :: rest = tpToRefresh val nextOffset = topicPartitionsAndOffsets.get(head).get - // Whatever we do, we can't say Broker, even though we're - // manipulating it here. Broker is a private type and Scala doesn't seem - // to care about that as long as you don't explicitly declare its type. - val brokerOption = partitionMetadata(head.topic) - .partitionsMetadata - .find(_.partitionId == head.partition) - .flatMap(_.leader) - - brokerOption match { - case Some(broker) => - def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) - - brokerProxies - .getOrElseUpdate((broker.host, broker.port), createBrokerProxy) - .addTopicPartition(head, Option(nextOffset)) + getHostPort(topicMetadata(head.topic), head.partition) match { + case Some((host, port)) => + val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) + brokerProxy.addTopicPartition(head, Option(nextOffset)) + brokerProxy.start case None => warn("No such topic-partition: %s, dropping." format head) } rest @@ -152,6 +160,7 @@ private[kafka] class KafkaSystemConsumer( while (!tpToRefresh.isEmpty) { tpToRefresh = refresh(tpToRefresh) } + loop.done }, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index d6e3a52..a5e8614 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -27,6 +27,7 @@ import org.apache.samza.SamzaException import kafka.producer.Producer import org.apache.samza.system.SystemFactory import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.util.ClientUtilTopicMetadataStore class KafkaSystemFactory extends SystemFactory { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { @@ -49,11 +50,13 @@ class KafkaSystemFactory extends SystemFactory { val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) + val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) new KafkaSystemConsumer( systemName = systemName, brokerListString = brokerListString, metrics = metrics, + metadataStore = metadataStore, clientId = clientId, timeout = timeout, bufferSize = bufferSize, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 93766cd..e05fd0b 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -24,11 +24,16 @@ import org.junit.Assert._ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import kafka.common.TopicAndPartition +import org.apache.samza.util.TopicMetadataStore +import kafka.api.TopicMetadata +import kafka.api.PartitionMetadata +import kafka.cluster.Broker class TestKafkaSystemConsumer { @Test def testFetchThresholdShouldDivideEvenlyAmongPartitions { - val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000) { + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) { override def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) { } } @@ -41,4 +46,50 @@ class TestKafkaSystemConsumer { assertEquals(1000, consumer.perPartitionFetchThreshold) } + + @Test + def testBrokerCreationShouldTriggerStart { + val systemName = "test-system" + val streamName = "test-stream" + val metrics = new KafkaSystemConsumerMetrics + // Lie and tell the store that the partition metadata is empty. We can't + // use partition metadata because it has Broker in its constructor, which + // is package private to Kafka. + val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0))) + var hosts = List[String]() + var getHostPortCount = 0 + val consumer = new KafkaSystemConsumer(systemName, streamName, metrics, metadataStore) { + override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = { + // Generate a unique host every time getHostPort is called. + getHostPortCount += 1 + Some("localhost-%s" format getHostPortCount, 0) + } + + override def createBrokerProxy(host: String, port: Int): BrokerProxy = { + new BrokerProxy(host, port, systemName, "", metrics, sink) { + override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { + // Skip this since we normally do verification of offsets, which + // tries to connect to Kafka. Rather than mock that, just forget it. + nextOffsets.size + } + + override def start { + hosts :+= host + } + } + } + } + + consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1") + assertEquals(0, hosts.size) + consumer.start + assertEquals(List("localhost-1"), hosts) + // Should trigger a refresh with a new host. + consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2) + assertEquals(List("localhost-1", "localhost-2"), hosts) + } +} + +class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { + def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata } \ No newline at end of file
