Repository: samza Updated Branches: refs/heads/master 078afb57f -> 003ad1068
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 51545a0..59a8854 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -19,13 +19,10 @@ package org.apache.samza.system.kafka -import org.apache.samza.metrics.MetricsHelper -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsRegistry import java.util.concurrent.ConcurrentHashMap + import kafka.common.TopicAndPartition -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge +import org.apache.samza.metrics._ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val offsets = new ConcurrentHashMap[TopicAndPartition, Counter] @@ -34,67 +31,66 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] - /* - * (String, Int) = (host, port) of BrokerProxy. - */ - - val reconnects = new ConcurrentHashMap[(String, Int), Counter] - val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter] - val brokerReads = new ConcurrentHashMap[(String, Int), Counter] - val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), Counter] - val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]] + val clientBytesRead = new ConcurrentHashMap[String, Counter] + val clientReads = new ConcurrentHashMap[String, Counter] + val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter] + val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]] def registerTopicAndPartition(tp: TopicAndPartition) = { if (!offsets.contains(tp)) { - offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition))) - bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition))) - reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition))) - highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, tp.partition), -1L)) - lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L)) + offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition))) + bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition))) + reads.put(tp, newCounter("%s-%s-messages-read" format(tp.topic, tp.partition))) + highWatermark.put(tp, newGauge("%s-%s-high-watermark" format(tp.topic, tp.partition), -1L)) + lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format(tp.topic, tp.partition), 0L)) } } - def registerBrokerProxy(host: String, port: Int) { - reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port))) - brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port))) - brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port))) - brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port))) - topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0)) + def registerClientProxy(clientName: String) { + clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName)) + clientReads.put((clientName), newCounter("%s-messages-read" format clientName)) + clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName)) + topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" format clientName, 0)) } // java friendlier interfaces // Gauges - def setTopicPartitionValue(host: String, port: Int, value: Int) { - topicPartitions.get((host,port)).set(value) + def setNumTopicPartitions(clientName: String, value: Int) { + topicPartitions.get(clientName).set(value) } + def setLagValue(topicAndPartition: TopicAndPartition, value: Long) { lag.get((topicAndPartition)).set(value); } + def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) { highWatermark.get((topicAndPartition)).set(value); } // Counters - def incBrokerReads(host: String, port: Int) { - brokerReads.get((host,port)).inc + def incClientReads(clientName: String) { + clientReads.get(clientName).inc } + def incReads(topicAndPartition: TopicAndPartition) { reads.get(topicAndPartition).inc; } + def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) { bytesRead.get(topicAndPartition).inc(inc); } - def incBrokerBytesReads(host: String, port: Int, incBytes: Long) { - brokerBytesRead.get((host,port)).inc(incBytes) + + def incClientBytesReads(clientName: String, incBytes: Long) { + clientBytesRead.get(clientName).inc(incBytes) } - def incBrokerSkippedFetchRequests(host: String, port: Int) { - brokerSkippedFetchRequests.get((host,port)).inc() + + def incClientSkippedFetchRequests(clientName: String) { + clientSkippedFetchRequests.get(clientName).inc() } + def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) { offsets.get(topicAndPartition).set(offset) } - def incReconnects(host: String, port: Int) { - reconnects.get((host,port)).inc() - } + override def getPrefix = systemName + "-" } http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 9f0b5f2..ba5390b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -20,21 +20,19 @@ package org.apache.samza.system.kafka import java.util.Properties + import kafka.utils.ZkUtils +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode -import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore} -import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config} -import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ -import org.apache.samza.system.SystemProducer -import org.apache.samza.system.SystemAdmin import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.system.SystemConsumer +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.config._ +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer} +import org.apache.samza.util._ object KafkaSystemFactory extends Logging { def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { @@ -46,49 +44,27 @@ object KafkaSystemFactory extends Logging { } class KafkaSystemFactory extends SystemFactory with Logging { + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { - val clientId = KafkaUtil.getClientId("samza-consumer", config) + val clientId = KafkaConsumerConfig.getConsumerClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) - // Kind of goofy to need a producer config for consumers, but we need metadata. - val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) - val bootstrapServers = producerConfig.bootsrapServers - val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) + val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) + info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer)) - val timeout = consumerConfig.socketTimeoutMs - val bufferSize = consumerConfig.socketReceiveBufferBytes - val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName)) - val consumerMinSize = consumerConfig.fetchMinBytes - val consumerMaxWait = consumerConfig.fetchWaitMaxMs - val autoOffsetResetDefault = consumerConfig.autoOffsetReset - val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) - val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt - val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong - val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout) - - new KafkaSystemConsumer( - systemName = systemName, - systemAdmin = getAdmin(systemName, config), - metrics = metrics, - metadataStore = metadataStore, - clientId = clientId, - timeout = timeout, - bufferSize = bufferSize, - fetchSize = fetchSize, - consumerMinSize = consumerMinSize, - consumerMaxWait = consumerMaxWait, - fetchThreshold = fetchThreshold, - fetchThresholdBytes = fetchThresholdBytes, - fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName), - offsetGetter = offsetGetter) + val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock) + info("Created samza system consumer %s" format (kafkaSystemConsumer.toString)) + + kafkaSystemConsumer } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { - val clientId = KafkaUtil.getClientId("samza-producer", config) + val clientId = KafkaConsumerConfig.getProducerClientId(config) val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) - val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) } + val getProducer = () => { + new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) + } val metrics = new KafkaSystemProducerMetrics(systemName, registry) // Unlike consumer, no need to use encoders here, since they come for free @@ -104,7 +80,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getAdmin(systemName: String, config: Config): SystemAdmin = { - val clientId = KafkaUtil.getClientId("samza-admin", config) + val clientId = KafkaConsumerConfig.getAdminClientId(config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) @@ -119,13 +95,13 @@ class KafkaSystemFactory extends SystemFactory with Logging { val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt val storeToChangelog = config.getKafkaChangelogEnabledStores() // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. - val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) => - { - val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt - val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) - info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor)) - (topicName, changelogInfo) - }} + val topicMetaInformation = storeToChangelog.map { case (storeName, topicName) => { + val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt + val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) + info("Creating topic meta information for topic: %s with replication factor: %s" format(topicName, replicationFactor)) + (topicName, changelogInfo) + } + } val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean) val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config) @@ -150,7 +126,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } } - def getIntermediateStreamProperties(config : Config): Map[String, Properties] = { + def getIntermediateStreamProperties(config: Config): Map[String, Properties] = { val appConfig = new ApplicationConfig(config) if (appConfig.getAppMode == ApplicationMode.BATCH) { val streamConfig = new StreamConfig(config) http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java new file mode 100644 index 0000000..de5d093 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samza.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.junit.Assert; +import org.junit.Test; + + +public class TestKafkaConsumerConfig { + + public final static String SYSTEM_NAME = "testSystem"; + public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; + public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; + private final static String CLIENT_ID = "clientId"; + + @Test + public void testDefaults() { + Map<String, String> props = new HashMap<>(); + + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + "Ignore"); // should be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + "100"); // should NOT be ignored + + props.put(JobConfig.JOB_NAME(), "jobName"); + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); + + Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + + Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS, + kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); + + Assert.assertEquals(RangeAssignor.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); + + Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assert.assertEquals("100", kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + + Assert.assertEquals(ByteArrayDeserializer.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + + Assert.assertEquals(ByteArrayDeserializer.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + + Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + + Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config), + kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); + + Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1", + KafkaConsumerConfig.getConsumerClientId(config)); + Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config)); + + props.put(JobConfig.JOB_ID(), "jobId"); + config = new MapConfig(props); + + Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId", + KafkaConsumerConfig.getConsumerClientId(config)); + Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config)); + } + + // test stuff that should not be overridden + @Test + public void testNotOverride() { + Map<String, String> props = new HashMap<>(); + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + TestKafkaConsumerConfig.class.getName()); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + TestKafkaConsumerConfig.class.getName()); + + props.put(JobConfig.JOB_NAME(), "jobName"); + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); + + Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + Assert.assertEquals(TestKafkaConsumerConfig.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + + Assert.assertEquals(TestKafkaConsumerConfig.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + } + + @Test + public void testGetConsumerClientId() { + Map<String, String> map = new HashMap<>(); + + map.put(JobConfig.JOB_NAME(), "jobName"); + map.put(JobConfig.JOB_ID(), "jobId"); + String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-jobName-jobId", result); + + result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map)); + Assert.assertEquals("consumer_-jobName-jobId", result); + + result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map)); + Assert.assertEquals("super_duper_consumer-jobName-jobId", result); + + map.put(JobConfig.JOB_NAME(), " very important!job"); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-_very_important_job-jobId", result); + + map.put(JobConfig.JOB_ID(), "number-#3"); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-_very_important_job-number__3", result); + } + + @Test(expected = SamzaException.class) + public void testNoBootstrapServers() { + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME, + "clientId"); + + Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index 77f47f9..7e968bf 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,17 +19,14 @@ package org.apache.samza.system.kafka; -import java.util.*; import java.util.HashMap; import java.util.Map; - +import java.util.Properties; import kafka.api.TopicMetadata; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.util.ScalaJavaUtil; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { admin.createStream(spec); admin.validateStream(spec); - } @Test @@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { public void testCreateStream() { StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec)); systemAdmin().validateStream(spec); assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec)); @@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8); StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec1)); systemAdmin().validateStream(spec2); } @@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8); StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec1)); systemAdmin().validateStream(spec2); } @@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { public void testClearStream() { StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec)); assertTrue(systemAdmin().clearStream(spec)); scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName()); http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala deleted file mode 100644 index d510076..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ /dev/null @@ -1,434 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.samza.system.kafka - -import java.nio.ByteBuffer -import java.util.concurrent.CountDownLatch - -import kafka.api.{PartitionOffsetsResponse, _} -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet} -import org.apache.kafka.common.protocol.Errors -import org.apache.samza.SamzaException -import org.apache.samza.util.Logging -import org.junit.Assert._ -import org.junit._ -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.mockito.{Matchers, Mockito} - -import scala.collection.JavaConverters._ - -class TestBrokerProxy extends Logging { - val tp2 = new TopicAndPartition("Redbird", 2013) - var fetchTp1 = true // control whether fetching tp1 messages or not - - @Test def brokerProxyRetrievesMessagesCorrectly() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - bp.addTopicPartition(tp, Option("0")) - // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(2, sink.receivedMessages.size) - assertEquals(42, sink.receivedMessages(0)._2.offset) - assertEquals(84, sink.receivedMessages(1)._2.offset) - } - - @Test def brokerProxySkipsFetchForEmptyRequests() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - // Only add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(0, sink.receivedMessages.size) - assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0) - assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount) - } - - @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { - val (bp, tp, _) = getMockBrokerProxy() - bp.start - bp.addTopicPartition(tp, Option("0")) - - try { - bp.addTopicPartition(tp, Option("1")) - fail("Should have thrown an exception") - } catch { - case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") - case other: Exception => fail("Got some other exception than what we were expecting: " + other) - } - } - - def getMockBrokerProxy() = { - val sink = new MessageSink { - val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]() - - def abdicate(tp: TopicAndPartition, nextOffset: Long) {} - - def refreshDropped() {} - - def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { - receivedMessages += ((tp, msg, msg.offset.equals(highWatermark))) - } - - def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - } - - // Never need messages for tp2. - def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1 - } - - val system = "daSystem" - val host = "host" - val port = 2222 - val tp = new TopicAndPartition("Redbird", 2012) - val metrics = new KafkaSystemConsumerMetrics(system) - - metrics.registerBrokerProxy(host, port) - metrics.registerTopicAndPartition(tp) - metrics.topicPartitions.get((host, port)).set(1) - - val bp = new BrokerProxy( - host, - port, - system, - "daClientId", - metrics, - sink, - offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) { - - override val sleepMSWhileNoTopicPartitions = 100 - // Speed up for test - var alreadyCreatedConsumer = false - - // Scala traits and Mockito mocks don't mix, unfortunately. - override def createSimpleConsumer() = { - if (alreadyCreatedConsumer) { - System.err.println("Should only be creating one consumer in this test!") - throw new InterruptedException("Should only be creating one consumer in this test!") - } - alreadyCreatedConsumer = true - - new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) { - val sc = Mockito.mock(classOf[SimpleConsumer]) - val mockOffsetResponse = { - val offsetResponse = Mockito.mock(classOf[OffsetResponse]) - val partitionOffsetResponse = { - val por = Mockito.mock(classOf[PartitionOffsetsResponse]) - when(por.offsets).thenReturn(List(1l).toSeq) - por - } - - val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse) - when(offsetResponse.partitionErrorAndOffsets).thenReturn(map) - offsetResponse - } - - when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse) - - val fetchResponse = { - val fetchResponse = Mockito.mock(classOf[FetchResponse]) - - val messageSet = { - val messageSet = Mockito.mock(classOf[ByteBufferMessageSet]) - - def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) - val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84)) - - when(messageSet.sizeInBytes).thenReturn(43) - when(messageSet.size).thenReturn(44) - when(messageSet.iterator).thenReturn(messages.iterator) - when(messageSet.head).thenReturn(messages.head) - messageSet - } - - val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet) - val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData) - - when(fetchResponse.data).thenReturn(map.toSeq) - when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) - fetchResponse - } - when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse) - - override def close() = sc.close() - - override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request) - - override def fetch(request: FetchRequest): FetchResponse = { - // Verify that we only get fetch requests for one tp, even though - // two were registered. This is to verify that - // sink.needsMoreMessages works. - assertEquals(1, request.requestInfo.size) - sc.fetch(request) - } - - when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100) - - override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request) - - override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request) - - override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request) - - override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId) - } - } - - } - - (bp, tp, sink) - } - - @Test def brokerProxyUpdateLatencyMetrics() = { - val (bp, tp, _) = getMockBrokerProxy() - - bp.start - bp.addTopicPartition(tp, Option("0")) - Thread.sleep(1000) - // update when fetching messages - assertEquals(500, bp.metrics.highWatermark.get(tp).getValue) - assertEquals(415, bp.metrics.lag.get(tp).getValue) - - fetchTp1 = false - Thread.sleep(1000) - // update when not fetching messages - assertEquals(100, bp.metrics.highWatermark.get(tp).getValue) - assertEquals(15, bp.metrics.lag.get(tp).getValue) - - fetchTp1 = true - } - - @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = { - // Need to wait for the thread to do some work before ending the test - val countdownLatch = new CountDownLatch(1) - var failString: String = null - - val mockMessageSink = mock(classOf[MessageSink]) - when(mockMessageSink.needsMoreMessages(any())).thenReturn(true) - - val doNothingMetrics = new KafkaSystemConsumerMetrics() - - val tp = new TopicAndPartition("topic", 42) - - val mockOffsetGetter = mock(classOf[GetOffset]) - // This will be used by the simple consumer below, and this is the response that simple consumer needs - when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) - when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) - - var callsToCreateSimpleConsumer = 0 - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - // Create an answer that first indicates offset out of range on first invocation and on second - // verifies that the parameters have been updated to what we expect them to be - val answer = new Answer[FetchResponse]() { - var invocationCount = 0 - - def answer(invocation: InvocationOnMock): FetchResponse = { - val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)] - - if (invocationCount == 0) { - if (arguments !=(tp, 0)) { - failString = "First invocation did not have the right arguments: " + arguments - countdownLatch.countDown() - } - val mfr = mock(classOf[FetchResponse]) - when(mfr.hasError).thenReturn(true) - when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE) - - val messageSet = mock(classOf[MessageSet]) - when(messageSet.iterator).thenReturn(Iterator.empty) - val response = mock(classOf[FetchResponsePartitionData]) - when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE) - val responseMap = Map(tp -> response) - when(mfr.data).thenReturn(responseMap.toSeq) - invocationCount += 1 - mfr - } else { - if (arguments !=(tp, 1492)) { - failString = "On second invocation, arguments were not correct: " + arguments - } - countdownLatch.countDown() - Thread.currentThread().interrupt() - null - } - } - } - - when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer) - - // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset - - val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - - override def createSimpleConsumer() = { - if (callsToCreateSimpleConsumer > 1) { - failString = "Tried to create more than one simple consumer" - countdownLatch.countDown() - } - callsToCreateSimpleConsumer += 1 - mockSimpleConsumer - } - } - - bp.addTopicPartition(tp, Option("0")) - bp.start - countdownLatch.await() - bp.stop - if (failString != null) { - fail(failString) - } - } - - /** - * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions - * that it owns when a consumer failure occurs. - */ - @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = { - val countdownLatch = new CountDownLatch(1) - var abdicated: Option[TopicAndPartition] = None - @volatile var refreshDroppedCount = 0 - val mockMessageSink = new MessageSink { - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - } - - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { - } - - override def abdicate(tp: TopicAndPartition, nextOffset: Long) { - abdicated = Some(tp) - countdownLatch.countDown - } - - override def refreshDropped() { - refreshDroppedCount += 1 - } - - override def needsMoreMessages(tp: TopicAndPartition): Boolean = { - true - } - } - - val doNothingMetrics = new KafkaSystemConsumerMetrics() - val tp = new TopicAndPartition("topic", 42) - val mockOffsetGetter = mock(classOf[GetOffset]) - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) - when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) - when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that.")) - - val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - - val waitForRefresh = () => { - val currentRefreshDroppedCount = refreshDroppedCount - while (refreshDroppedCount == currentRefreshDroppedCount) { - Thread.sleep(100) - } - } - - bp.addTopicPartition(tp, Option("0")) - bp.start - // BP should refresh on startup. - waitForRefresh() - countdownLatch.await() - // BP should continue refreshing after it's abdicated all TopicAndPartitions. - waitForRefresh() - bp.stop - assertEquals(tp, abdicated.getOrElse(null)) - } - - @Test def brokerProxyAbdicatesHardErrors(): Unit = { - val doNothingMetrics = new KafkaSystemConsumerMetrics - val mockMessageSink = new MessageSink { - override def needsMoreMessages(tp: TopicAndPartition): Boolean = true - override def abdicate(tp: TopicAndPartition, nextOffset: Long) {} - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {} - override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")} - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} - } - val mockOffsetGetter = mock(classOf[GetOffset]) - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - var caughtError = false - try { - bp.thread.run - } catch { - case e: SamzaException => { - assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.") - info("Received OutOfMemoryError in broker proxy.") - caughtError = true - } - } - assertEquals(true, caughtError) - val mockMessageSink2 = new MessageSink { - override def needsMoreMessages(tp: TopicAndPartition): Boolean = true - override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {} - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {} - override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")} - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} - } - caughtError = false - val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - try { - bp2.thread.run - } catch { - case e: SamzaException => { - assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.") - info("Received StackOverflowError in broker proxy.") - caughtError = true - } - } - assertEquals(true, caughtError) - } - - @Test - def brokerProxyStopCloseConsumer: Unit = { - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){ - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - bp.start - bp.stop - verify(mockSimpleConsumer).close - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java new file mode 100644 index 0000000..5791545 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java @@ -0,0 +1,220 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestKafkaSystemConsumer { + public final String TEST_SYSTEM = "test-system"; + public final String TEST_STREAM = "test-stream"; + public final String TEST_CLIENT_ID = "testClientId"; + public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; + public final String FETCH_THRESHOLD_MSGS = "50000"; + public final String FETCH_THRESHOLD_BYTES = "100000"; + + private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) { + final Map<String, String> map = new HashMap<>(); + + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); + map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + BOOTSTRAP_SERVER); + map.put(JobConfig.JOB_NAME(), "jobName"); + + Config config = new MapConfig(map); + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID); + final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig); + + MockKafkaSystemConsumer newKafkaSystemConsumer = + new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, + new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); + + return newKafkaSystemConsumer; + } + + @Test + public void testConfigValidations() { + + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.start(); + // should be no failures + } + + @Test + public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final int partitionsNum = 50; + for (int i = 0; i < partitionsNum; i++) { + consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); + } + + consumer.start(); + + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, + consumer.perPartitionFetchThresholdBytes); + + consumer.stop(); + } + + @Test + public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { + + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); + + consumer.register(ssp0, "0"); + consumer.register(ssp0, "5"); + consumer.register(ssp1, "2"); + consumer.register(ssp1, "3"); + consumer.register(ssp2, "0"); + + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); + assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); + } + + @Test + public void testFetchThresholdBytes() { + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size + int ime11Size = 20; + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // queue for ssp1 should full now, because we added message of size 20 on top + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); + } + + @Test + public void testFetchThresholdBytesDiabled() { + // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit + int ime11Size = 20;// event with the second message still below the size limit + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + + // limit by number of messages 4/2 = 2 per partition + // limit by number of bytes - disabled + KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // should be full by size, but not full by number of messages (1 of 2) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // not full neither by size nor by messages + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // not full by size, but should be full by messages + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); + } + + // mock kafkaConsumer and SystemConsumer + static class MockKafkaConsumer extends KafkaConsumer { + public MockKafkaConsumer(Map<String, Object> configs) { + super(configs); + } + } + + static class MockKafkaSystemConsumer extends KafkaSystemConsumer { + public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + super(kafkaConsumer, systemName, config, clientId, metrics, clock); + } + + @Override + void startConsumer() { + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala deleted file mode 100644 index 8656d10..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import kafka.api.TopicMetadata -import kafka.api.PartitionMetadata -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.message.Message -import kafka.message.MessageAndOffset -import org.apache.kafka.common.protocol.Errors -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition -import org.apache.samza.util.TopicMetadataStore -import org.junit.Test -import org.junit.Assert._ -import org.apache.samza.system.SystemAdmin -import org.mockito.Mockito._ -import org.mockito.Matchers._ - -class TestKafkaSystemConsumer { - val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin]) - private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0)) - private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null) - private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100) - private val clientId = "TestClientId" - - @Test - def testFetchThresholdShouldDivideEvenlyAmongPartitions { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) { - override def refreshBrokers { - } - } - - for (i <- 0 until 50) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(1000, consumer.perPartitionFetchThreshold) - } - - @Test - def testBrokerCreationShouldTriggerStart { - val systemName = "test-system" - val streamName = "test-stream" - val metrics = new KafkaSystemConsumerMetrics - // Lie and tell the store that the partition metadata is empty. We can't - // use partition metadata because it has Broker in its constructor, which - // is package private to Kafka. - val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE))) - var hosts = List[String]() - var getHostPortCount = 0 - val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) { - override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { - // Generate a unique host every time getHostPort is called. - getHostPortCount += 1 - Some("localhost-%s" format getHostPortCount, 0) - } - - override def createBrokerProxy(host: String, port: Int): BrokerProxy = { - new BrokerProxy(host, port, systemName, "", metrics, sink) { - override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { - // Skip this since we normally do verification of offsets, which - // tries to connect to Kafka. Rather than mock that, just forget it. - nextOffsets.size - } - - override def start { - hosts :+= host - } - } - } - } - - consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1") - assertEquals(0, hosts.size) - consumer.start - assertEquals(List("localhost-1"), hosts) - // Should trigger a refresh with a new host. - consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2) - assertEquals(List("localhost-1", "localhost-2"), hosts) - } - - @Test - def testConsumerRegisterOlderOffsetOfTheSamzaSSP { - when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod() - - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) - val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) - val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) - val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2)) - - consumer.register(ssp0, "0") - consumer.register(ssp0, "5") - consumer.register(ssp1, "2") - consumer.register(ssp1, "3") - consumer.register(ssp2, "0") - - assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0))) - assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1))) - assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2))) - } - - @Test - def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(5000, consumer.perPartitionFetchThreshold) - assertEquals(3000, consumer.perPartitionFetchThresholdBytes) - } - - @Test - def testFetchThresholdBytes { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - val msg = Array[Byte](5, 112, 9, 126) - val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654) - // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead - consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354) - - assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) - } - - @Test - def testFetchThresholdBytesDisabled { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(5000, consumer.perPartitionFetchThreshold) - assertEquals(0, consumer.perPartitionFetchThresholdBytes) - assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) - } -} - -class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { - def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata -} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 864d2e5..8405c63 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.JaasUtils import org.apache.samza.config._ import org.apache.samza.container.TaskName -import org.apache.samza.job.local.ThreadJobFactory +import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory} import org.apache.samza.job.model.{ContainerModel, JobModel} import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob} import org.apache.samza.metrics.MetricsRegistryMap @@ -223,6 +223,13 @@ class StreamTaskTestUtil { * interrupt, which is forwarded on to ThreadJob, and marked as a failure). */ def stopJob(job: StreamJob) { + // make sure we don't kill the job before it was started. + // eventProcesses guarantees all the consumers have been initialized + val tasks = TestTask.tasks + val task = tasks.values.toList.head + task.eventProcessed.await(60, TimeUnit.SECONDS) + assertEquals(0, task.eventProcessed.getCount) + // Shutdown task. job.kill val status = job.waitForFinish(60000) @@ -279,7 +286,10 @@ class StreamTaskTestUtil { val taskConfig = new TaskConfig(jobModel.getConfig) val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap()) checkpointManager match { - case Some(checkpointManager) => checkpointManager.createResources + case Some(checkpointManager) => { + checkpointManager.createResources + checkpointManager.stop + } case _ => assert(checkpointManager != null, "No checkpoint manager factory configured") } @@ -323,6 +333,7 @@ object TestTask { abstract class TestTask extends StreamTask with InitableTask { var received = ArrayBuffer[String]() val initFinished = new CountDownLatch(1) + val eventProcessed = new CountDownLatch(1) @volatile var gotMessage = new CountDownLatch(1) def init(config: Config, context: TaskContext) { @@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with InitableTask { def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val msg = envelope.getMessage.asInstanceOf[String] + eventProcessed.countDown() + System.err.println("TestTask.process(): %s" format msg) received += msg http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala index e4d47d1..ccb7cd4 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala @@ -77,7 +77,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil { val (job, task) = startJob // Validate that restored is empty. - assertEquals(0, task.initFinished.getCount) assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size) assertEquals(0, task.received.size) @@ -88,7 +87,6 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil { send(task, "2") send(task, "99") send(task, "99") - stopJob(job) } @@ -120,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask { .asInstanceOf[KeyValueStore[String, String]] val iter = store.all iter.asScala.foreach( p => restored += (p.getKey -> p.getValue)) - System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored) + System.out.println("ShutdownStateStoreTask.createStream(): %s" format restored) iter.close } http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 0b405f0..b30b896 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -157,7 +157,7 @@ public class YarnJobValidationTool { coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()); + JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); validator.init(config); Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); for (Map.Entry<String, String> entry : jmxUrls.entrySet()) { http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala index da23b91..1ad4522 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala @@ -106,7 +106,9 @@ class TestSamzaYarnAppMasterService { coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager) - JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping()) + coordinatorStreamManager.stop() + jobModelManager } private def getDummyConfig: Config = new MapConfig(Map[String, String](
