http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala new file mode 100644 index 0000000..84bebef --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -0,0 +1,596 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.util.regex.Pattern +import java.util + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.TopicPartition + +import kafka.utils.{TestUtils, Logging} +import kafka.server.KafkaConfig + +import java.util.ArrayList +import org.junit.Assert._ +import org.junit.{Test, Before} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import kafka.coordinator.ConsumerCoordinator + +/** + * Integration tests for the new consumer that cover basic usage as well as server failures + */ +abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { + + val producerCount = 1 + val consumerCount = 2 + val serverCount = 3 + + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + val part2 = 1 + val tp2 = new TopicPartition(topic, part2) + + // configure the servers and clients + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") + + @Before + override def setUp() { + super.setUp() + + // create the test topic with all the brokers as replicas + TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers) + } + + @Test + def testSimpleConsumption() { + val numRecords = 10000 + sendRecords(numRecords) + + assertEquals(0, this.consumers(0).assignment.size) + this.consumers(0).assign(List(tp)) + assertEquals(1, this.consumers(0).assignment.size) + + this.consumers(0).seek(tp, 0) + consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) + + // check async commit callbacks + val commitCallback = new CountConsumerCommitCallback() + this.consumers(0).commitAsync(commitCallback) + + // shouldn't make progress until poll is invoked + Thread.sleep(10) + assertEquals(0, commitCallback.count) + awaitCommitCallback(this.consumers(0), commitCallback) + } + + @Test + def testAutoCommitOnClose() { + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + val numRecords = 10000 + sendRecords(numRecords) + + consumer0.subscribe(List(topic)) + + val assignment = Set(tp, tp2) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == assignment.asJava + }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") + + // should auto-commit seeked positions before closing + consumer0.seek(tp, 300) + consumer0.seek(tp2, 500) + consumer0.close() + + // now we should see the committed positions from another consumer + assertEquals(300, this.consumers(0).committed(tp).offset) + assertEquals(500, this.consumers(0).committed(tp2).offset) + } + + @Test + def testAutoCommitOnCloseAfterWakeup() { + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + val numRecords = 10000 + sendRecords(numRecords) + + consumer0.subscribe(List(topic)) + + val assignment = Set(tp, tp2) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == assignment.asJava + }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") + + // should auto-commit seeked positions before closing + consumer0.seek(tp, 300) + consumer0.seek(tp2, 500) + + // wakeup the consumer before closing to simulate trying to break a poll + // loop from another thread + consumer0.wakeup() + consumer0.close() + + // now we should see the committed positions from another consumer + assertEquals(300, this.consumers(0).committed(tp).offset) + assertEquals(500, this.consumers(0).committed(tp2).offset) + } + + @Test + def testAutoCommitOnRebalance() { + val topic2 = "topic2" + TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers) + + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + val numRecords = 10000 + sendRecords(numRecords) + + consumer0.subscribe(List(topic)) + + val assignment = Set(tp, tp2) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == assignment.asJava + }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") + + consumer0.seek(tp, 300) + consumer0.seek(tp2, 500) + + // change subscription to trigger rebalance + consumer0.subscribe(List(topic, topic2)) + + val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == newAssignment.asJava + }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}") + + // after rebalancing, we should have reset to the committed positions + assertEquals(300, consumer0.committed(tp).offset) + assertEquals(500, consumer0.committed(tp2).offset) + } + + @Test + def testPatternSubscription() { + val numRecords = 10000 + sendRecords(numRecords) + + val topic1: String = "tblablac" // matches subscribed pattern + TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers) + sendRecords(1000, new TopicPartition(topic1, 0)) + sendRecords(1000, new TopicPartition(topic1, 1)) + + val topic2: String = "tblablak" // does not match subscribed pattern + TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers) + sendRecords(1000, new TopicPartition(topic2, 0)) + sendRecords(1000, new TopicPartition(topic2, 1)) + + val topic3: String = "tblab1" // does not match subscribed pattern + TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers) + sendRecords(1000, new TopicPartition(topic3, 0)) + sendRecords(1000, new TopicPartition(topic3, 1)) + + assertEquals(0, this.consumers(0).assignment().size) + + val pattern: Pattern = Pattern.compile("t.*c") + this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener) + this.consumers(0).poll(50) + + var subscriptions = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment() == subscriptions.asJava + }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") + + val topic4: String = "tsomec" // matches subscribed pattern + TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers) + sendRecords(1000, new TopicPartition(topic4, 0)) + sendRecords(1000, new TopicPartition(topic4, 1)) + + subscriptions = subscriptions ++ Set( + new TopicPartition(topic4, 0), + new TopicPartition(topic4, 1)) + + + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment() == subscriptions.asJava + }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") + + this.consumers(0).unsubscribe() + assertEquals(0, this.consumers(0).assignment().size) + } + + @Test + def testPatternUnsubscription() { + val numRecords = 10000 + sendRecords(numRecords) + + val topic1: String = "tblablac" // matches subscribed pattern + TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers) + sendRecords(1000, new TopicPartition(topic1, 0)) + sendRecords(1000, new TopicPartition(topic1, 1)) + + assertEquals(0, this.consumers(0).assignment().size) + + this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener) + this.consumers(0).poll(50) + + val subscriptions = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment() == subscriptions.asJava + }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") + + this.consumers(0).unsubscribe() + assertEquals(0, this.consumers(0).assignment().size) + } + + @Test + def testCommitSpecifiedOffsets() { + sendRecords(5, tp) + sendRecords(7, tp2) + + this.consumers(0).assign(List(tp, tp2)); + + // Need to poll to join the group + this.consumers(0).poll(50) + val pos1 = this.consumers(0).position(tp) + val pos2 = this.consumers(0).position(tp2) + this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) + assertEquals(3, this.consumers(0).committed(tp).offset) + assertNull(this.consumers(0).committed(tp2)) + + // positions should not change + assertEquals(pos1, this.consumers(0).position(tp)) + assertEquals(pos2, this.consumers(0).position(tp2)) + this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) + assertEquals(3, this.consumers(0).committed(tp).offset) + assertEquals(5, this.consumers(0).committed(tp2).offset) + + // Using async should pick up the committed changes after commit completes + val commitCallback = new CountConsumerCommitCallback() + this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback) + awaitCommitCallback(this.consumers(0), commitCallback) + assertEquals(7, this.consumers(0).committed(tp2).offset) + } + + @Test + def testAutoOffsetReset() { + sendRecords(1) + this.consumers(0).assign(List(tp)) + consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0) + } + + @Test + def testSeek() { + val consumer = this.consumers(0) + val totalRecords = 50L + sendRecords(totalRecords.toInt) + consumer.assign(List(tp)) + + consumer.seekToEnd(tp) + assertEquals(totalRecords, consumer.position(tp)) + assertFalse(consumer.poll(totalRecords).iterator().hasNext) + + consumer.seekToBeginning(tp) + assertEquals(0, consumer.position(tp), 0) + consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0) + + val mid = totalRecords / 2 + consumer.seek(tp, mid) + assertEquals(mid, consumer.position(tp)) + consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt) + } + + @Test + def testGroupConsumption() { + sendRecords(10) + this.consumers(0).subscribe(List(topic)) + consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset = 0) + } + + + @Test + def testCommitMetadata() { + this.consumers(0).assign(List(tp)) + + // sync commit + val syncMetadata = new OffsetAndMetadata(5, "foo") + this.consumers(0).commitSync(Map((tp, syncMetadata))) + assertEquals(syncMetadata, this.consumers(0).committed(tp)) + + // async commit + val asyncMetadata = new OffsetAndMetadata(10, "bar") + val callback = new CountConsumerCommitCallback + this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback) + awaitCommitCallback(this.consumers(0), callback) + + assertEquals(asyncMetadata, this.consumers(0).committed(tp)) + } + + def testPositionAndCommit() { + sendRecords(5) + + // committed() on a partition with no committed offset throws an exception + intercept[NoOffsetForPartitionException] { + this.consumers(0).committed(new TopicPartition(topic, 15)) + } + + // position() on a partition that we aren't subscribed to throws an exception + intercept[IllegalArgumentException] { + this.consumers(0).position(new TopicPartition(topic, 15)) + } + + this.consumers(0).assign(List(tp)) + + assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp)) + this.consumers(0).commitSync() + assertEquals(0L, this.consumers(0).committed(tp).offset) + + consumeAndVerifyRecords(this.consumers(0), 5, 0) + assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp)) + this.consumers(0).commitSync() + assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset) + + sendRecords(1) + + // another consumer in the same group should get the same position + this.consumers(1).assign(List(tp)) + consumeAndVerifyRecords(this.consumers(1), 1, 5) + } + + @Test + def testPartitionsFor() { + val numParts = 2 + TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers) + val parts = this.consumers(0).partitionsFor("part-test") + assertNotNull(parts) + assertEquals(2, parts.size) + assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + + @Test + def testListTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + val topic3: String = "part-test-topic-3" + TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers) + TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers) + + val topics = this.consumers.head.listTopics() + assertNotNull(topics) + assertEquals(5, topics.size()) + assertEquals(5, topics.keySet().size()) + assertEquals(2, topics.get(topic1).size) + assertEquals(2, topics.get(topic2).size) + assertEquals(2, topics.get(topic3).size) + } + + @Test + def testPartitionReassignmentCallback() { + val listener = new TestConsumerReassignmentListener() + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumer0.subscribe(List(topic), listener) + + // the initial subscription should cause a callback execution + while(listener.callsToAssigned == 0) + consumer0.poll(50) + + // get metadata for the topic + var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala + while(parts == null) + parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala + assertEquals(1, parts.size) + assertNotNull(parts(0).leader()) + + // shutdown the coordinator + val coordinator = parts(0).leader().id() + this.servers(coordinator).shutdown() + + // this should cause another callback execution + while(listener.callsToAssigned < 2) + consumer0.poll(50) + + assertEquals(2, listener.callsToAssigned) + assertEquals(2, listener.callsToRevoked) + + consumer0.close() + } + + @Test + def testUnsubscribeTopic() { + + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + try { + val listener = new TestConsumerReassignmentListener() + consumer0.subscribe(List(topic), listener) + + // the initial subscription should cause a callback execution + while (listener.callsToAssigned == 0) + consumer0.poll(50) + + consumer0.subscribe(List()) + assertEquals(0, consumer0.assignment.size()) + } finally { + consumer0.close() + } + } + + @Test + def testExpandingTopicSubscriptions() { + val otherTopic = "other" + val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) + val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) + this.consumers(0).subscribe(List(topic)) + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment == subscriptions.asJava + }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}") + + TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers) + this.consumers(0).subscribe(List(topic, otherTopic)) + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment == expandedSubscriptions.asJava + }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}") + } + + @Test + def testShrinkingTopicSubscriptions() { + val otherTopic = "other" + TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers) + val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) + val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) + this.consumers(0).subscribe(List(topic, otherTopic)) + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment == subscriptions.asJava + }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}") + + this.consumers(0).subscribe(List(topic)) + TestUtils.waitUntilTrue(() => { + this.consumers(0).poll(50) + this.consumers(0).assignment == shrunkenSubscriptions.asJava + }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}") + } + + @Test + def testPartitionPauseAndResume() { + sendRecords(5) + this.consumers(0).assign(List(tp)) + consumeAndVerifyRecords(this.consumers(0), 5, 0) + this.consumers(0).pause(tp) + sendRecords(5) + assertTrue(this.consumers(0).poll(0).isEmpty) + this.consumers(0).resume(tp) + consumeAndVerifyRecords(this.consumers(0), 5, 5) + } + + @Test + def testPauseStateNotPreservedByRebalance() { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + sendRecords(5) + consumer0.subscribe(List(topic)) + consumeAndVerifyRecords(consumer0, 5, 0) + consumer0.pause(tp) + + // subscribe to a new topic to trigger a rebalance + consumer0.subscribe(List("topic2")) + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeAndVerifyRecords(consumer0, 0, 5) + } + + private class TestConsumerReassignmentListener extends ConsumerRebalanceListener { + var callsToAssigned = 0 + var callsToRevoked = 0 + def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) { + info("onPartitionsAssigned called.") + callsToAssigned += 1 + } + def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) { + info("onPartitionsRevoked called.") + callsToRevoked += 1 + } + } + + private def sendRecords(numRecords: Int): Unit = { + sendRecords(numRecords, tp) + } + + private def sendRecords(numRecords: Int, tp: TopicPartition) { + (0 until numRecords).map { i => + this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value $i".getBytes)) + }.foreach(_.get) + } + + private def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int, + startingKeyAndValueIndex: Int = 0) { + val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() + val maxIters = numRecords * 300 + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) + records.add(record) + if (iters > maxIters) + throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + for (i <- 0 until numRecords) { + val record = records.get(i) + val offset = startingOffset + i + assertEquals(topic, record.topic()) + assertEquals(part, record.partition()) + assertEquals(offset.toLong, record.offset()) + val keyAndValueIndex = startingKeyAndValueIndex + i + assertEquals(s"key $keyAndValueIndex", new String(record.key())) + assertEquals(s"value $keyAndValueIndex", new String(record.value())) + } + } + + private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = { + val startCount = commitCallback.count + val started = System.currentTimeMillis() + while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000) + this.consumers(0).poll(50) + assertEquals(startCount + 1, commitCallback.count) + } + + private class CountConsumerCommitCallback extends OffsetCommitCallback { + var count = 0 + + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1 + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala new file mode 100644 index 0000000..92c93e6 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.File +import java.util.Properties +import java.util.concurrent.TimeUnit + +import kafka.consumer.SimpleConsumer +import kafka.integration.KafkaServerTestHarness +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.errors.SerializationException +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +abstract class BaseProducerSendTest extends KafkaServerTestHarness { + + def generateConfigs = { + val overridingProps = new Properties() + val numServers = 2 + overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) + TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, overridingProps)) + } + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val topic = "topic" + private val numRecords = 100 + + @Before + override def setUp() { + super.setUp() + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") + } + + @After + override def tearDown() { + consumer1.close() + consumer2.close() + + super.tearDown() + } + + private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = + TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, + retries = retries, lingerMs = lingerMs, props = props) + + /** + * testSendOffset checks the basic send API behavior + * + * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. + * 2. Last message of the non-blocking send should return the correct offset metadata + */ + @Test + def testSendOffset() { + val producer = createProducer(brokerList) + val partition = new Integer(0) + + object callback extends Callback { + var offset = 0L + def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception == null) { + assertEquals(offset, metadata.offset()) + assertEquals(topic, metadata.topic()) + assertEquals(partition, metadata.partition()) + offset += 1 + } else { + fail("Send callback returns the following exception", exception) + } + } + } + + try { + // create topic + TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + + // send a normal record + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) + + // send a record with null value should be ok + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) + assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) + + // send a record with null key should be ok + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) + assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) + + // send a record with null part id should be ok + val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) + + // send a record with null topic should fail + try { + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) + producer.send(record4, callback) + fail("Should not allow sending a record without topic") + } catch { + case iae: IllegalArgumentException => // this is ok + case e: Throwable => fail("Only expecting IllegalArgumentException", e) + } + + // non-blocking send a list of records + for (i <- 1 to numRecords) + producer.send(record0, callback) + + // check that all messages have been acked via offset + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) + + } finally { + if (producer != null) + producer.close() + } + } + + @Test + def testWrongSerializer() { + // send a record with a wrong type should receive a serialization exception + try { + val producer = createProducerWithWrongSerializer(brokerList) + val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) + producer.send(record5) + fail("Should have gotten a SerializationException") + } catch { + case se: SerializationException => // this is ok + } + } + + private def createProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + createProducer(brokerList, props = Some(producerProps)) + } + + /** + * testClose checks the closing behavior + * + * After close() returns, all messages should be sent with correct returned offset metadata + */ + @Test + def testClose() { + val producer = createProducer(brokerList) + + try { + // create topic + TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + + // non-blocking send a list of records + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) + for (i <- 1 to numRecords) + producer.send(record0) + val response0 = producer.send(record0) + + // close the producer + producer.close() + + // check that all messages have been acked via offset, + // this also checks that messages with same key go to the same partition + assertTrue("The last message should be acked before producer is shutdown", response0.isDone) + assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) + + } finally { + if (producer != null) + producer.close() + } + } + + /** + * testSendToPartition checks the partitioning behavior + * + * The specified partition-id should be respected + */ + @Test + def testSendToPartition() { + val producer = createProducer(brokerList) + + try { + // create topic + val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) + val partition = 1 + + // make sure leaders exist + val leader1 = leaders(partition) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) + + val responses = + for (i <- 1 to numRecords) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + val futures = responses.toList + futures.foreach(_.get) + for (future <- futures) + assertTrue("Request should have completed", future.isDone) + + // make sure all of them end up in the same partition with increasing offset values + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, future.get.offset) + assertEquals(topic, future.get.topic) + assertEquals(partition, future.get.partition) + } + + // make sure the fetched messages also respect the partitioning and ordering + val fetchResponse1 = if(leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } + val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) + + // TODO: also check topic and partition after they are added in the return messageSet + for (i <- 0 to numRecords - 1) { + assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) + assertEquals(i.toLong, messageSet1(i).offset) + } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * testAutoCreateTopic + * + * The topic should be created upon sending the first message + */ + @Test + def testAutoCreateTopic() { + val producer = createProducer(brokerList, retries = 5) + + try { + // Send a message to auto-create the topic + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) + + // double check that the topic is created with leader elected + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + + } finally { + if (producer != null) { + producer.close() + } + } + } + + /** + * Test that flush immediately sends all accumulated requests. + */ + @Test + def testFlush() { + val producer = createProducer(brokerList, lingerMs = Long.MaxValue) + try { + TestUtils.createTopic(zkUtils, topic, 2, 2, servers) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) + for(i <- 0 until 50) { + val responses = (0 until numRecords) map (i => producer.send(record)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.flush() + assertTrue("All requests are complete.", responses.forall(_.isDone())) + } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with zero timeout from caller thread + */ + @Test + def testCloseWithZeroTimeoutFromCallerThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = createProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(0, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + fail("No message should be sent successfully.") + } catch { + case e: Exception => + assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with zero and non-zero timeout from sender thread + */ + @Test + def testCloseWithZeroTimeoutFromSenderThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + val leader = leaders(0) + + // create record + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record)) + // The close call will be called by all the message callbacks. This tests idempotence of the close call. + producer.close(0, TimeUnit.MILLISECONDS) + // Test close with non zero timeout. Should not block at all. + producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) + } + } + for(i <- 0 until 50) { + producer = createProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse = if (leader.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val expectedNumRecords = (i + 1) * numRecords + assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords), + expectedNumRecords, fetchResponse.messageSet(topic, 0).size) + } + } finally { + if (producer != null) + producer.close() + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/ConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala deleted file mode 100644 index d973d9a..0000000 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ /dev/null @@ -1,594 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import java.util.regex.Pattern -import java.{lang, util} - -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.TopicPartition - -import kafka.utils.{TestUtils, Logging} -import kafka.server.KafkaConfig - -import java.util.ArrayList -import org.junit.Assert._ -import org.junit.{Test, Before} - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import kafka.coordinator.ConsumerCoordinator - - -/** - * Integration tests for the new consumer that cover basic usage as well as server failures - */ -class ConsumerTest extends IntegrationTestHarness with Logging { - - val producerCount = 1 - val consumerCount = 2 - val serverCount = 3 - - val topic = "topic" - val part = 0 - val tp = new TopicPartition(topic, part) - val part2 = 1 - val tp2 = new TopicPartition(topic, part2) - - // configure the servers and clients - this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") - this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout - this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") - this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") - this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") - - @Before - override def setUp() { - super.setUp() - - // create the test topic with all the brokers as replicas - TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers) - } - - @Test - def testSimpleConsumption() { - val numRecords = 10000 - sendRecords(numRecords) - - assertEquals(0, this.consumers(0).assignment.size) - this.consumers(0).assign(List(tp)) - assertEquals(1, this.consumers(0).assignment.size) - - this.consumers(0).seek(tp, 0) - consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) - - // check async commit callbacks - val commitCallback = new CountConsumerCommitCallback() - this.consumers(0).commitAsync(commitCallback) - - // shouldn't make progress until poll is invoked - Thread.sleep(10) - assertEquals(0, commitCallback.count) - awaitCommitCallback(this.consumers(0), commitCallback) - } - - @Test - def testAutoCommitOnClose() { - this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - - val numRecords = 10000 - sendRecords(numRecords) - - consumer0.subscribe(List(topic)) - - val assignment = Set(tp, tp2) - TestUtils.waitUntilTrue(() => { - consumer0.poll(50) - consumer0.assignment() == assignment.asJava - }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") - - // should auto-commit seeked positions before closing - consumer0.seek(tp, 300) - consumer0.seek(tp2, 500) - consumer0.close() - - // now we should see the committed positions from another consumer - assertEquals(300, this.consumers(0).committed(tp).offset) - assertEquals(500, this.consumers(0).committed(tp2).offset) - } - - @Test - def testAutoCommitOnCloseAfterWakeup() { - this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - - val numRecords = 10000 - sendRecords(numRecords) - - consumer0.subscribe(List(topic)) - - val assignment = Set(tp, tp2) - TestUtils.waitUntilTrue(() => { - consumer0.poll(50) - consumer0.assignment() == assignment.asJava - }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") - - // should auto-commit seeked positions before closing - consumer0.seek(tp, 300) - consumer0.seek(tp2, 500) - - // wakeup the consumer before closing to simulate trying to break a poll - // loop from another thread - consumer0.wakeup() - consumer0.close() - - // now we should see the committed positions from another consumer - assertEquals(300, this.consumers(0).committed(tp).offset) - assertEquals(500, this.consumers(0).committed(tp2).offset) - } - - @Test - def testAutoCommitOnRebalance() { - val topic2 = "topic2" - TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers) - - this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - - val numRecords = 10000 - sendRecords(numRecords) - - consumer0.subscribe(List(topic)) - - val assignment = Set(tp, tp2) - TestUtils.waitUntilTrue(() => { - consumer0.poll(50) - consumer0.assignment() == assignment.asJava - }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") - - consumer0.seek(tp, 300) - consumer0.seek(tp2, 500) - - // change subscription to trigger rebalance - consumer0.subscribe(List(topic, topic2)) - - val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) - TestUtils.waitUntilTrue(() => { - consumer0.poll(50) - consumer0.assignment() == newAssignment.asJava - }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}") - - // after rebalancing, we should have reset to the committed positions - assertEquals(300, consumer0.committed(tp).offset) - assertEquals(500, consumer0.committed(tp2).offset) - } - - @Test - def testPatternSubscription() { - val numRecords = 10000 - sendRecords(numRecords) - - val topic1: String = "tblablac" // matches subscribed pattern - TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers) - sendRecords(1000, new TopicPartition(topic1, 0)) - sendRecords(1000, new TopicPartition(topic1, 1)) - - val topic2: String = "tblablak" // does not match subscribed pattern - TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers) - sendRecords(1000, new TopicPartition(topic2, 0)) - sendRecords(1000, new TopicPartition(topic2, 1)) - - val topic3: String = "tblab1" // does not match subscribed pattern - TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers) - sendRecords(1000, new TopicPartition(topic3, 0)) - sendRecords(1000, new TopicPartition(topic3, 1)) - - assertEquals(0, this.consumers(0).assignment().size) - - val pattern: Pattern = Pattern.compile("t.*c") - this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener) - this.consumers(0).poll(50) - - var subscriptions = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment() == subscriptions.asJava - }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") - - val topic4: String = "tsomec" // matches subscribed pattern - TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers) - sendRecords(1000, new TopicPartition(topic4, 0)) - sendRecords(1000, new TopicPartition(topic4, 1)) - - subscriptions = subscriptions ++ Set( - new TopicPartition(topic4, 0), - new TopicPartition(topic4, 1)) - - - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment() == subscriptions.asJava - }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") - - this.consumers(0).unsubscribe() - assertEquals(0, this.consumers(0).assignment().size) - } - - @Test - def testPatternUnsubscription() { - val numRecords = 10000 - sendRecords(numRecords) - - val topic1: String = "tblablac" // matches subscribed pattern - TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers) - sendRecords(1000, new TopicPartition(topic1, 0)) - sendRecords(1000, new TopicPartition(topic1, 1)) - - assertEquals(0, this.consumers(0).assignment().size) - - this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener) - this.consumers(0).poll(50) - - val subscriptions = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment() == subscriptions.asJava - }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}") - - this.consumers(0).unsubscribe() - assertEquals(0, this.consumers(0).assignment().size) - } - - @Test - def testCommitSpecifiedOffsets() { - sendRecords(5, tp) - sendRecords(7, tp2) - - this.consumers(0).assign(List(tp, tp2)); - - // Need to poll to join the group - this.consumers(0).poll(50) - val pos1 = this.consumers(0).position(tp) - val pos2 = this.consumers(0).position(tp2) - this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) - assertEquals(3, this.consumers(0).committed(tp).offset) - assertNull(this.consumers(0).committed(tp2)) - - // positions should not change - assertEquals(pos1, this.consumers(0).position(tp)) - assertEquals(pos2, this.consumers(0).position(tp2)) - this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) - assertEquals(3, this.consumers(0).committed(tp).offset) - assertEquals(5, this.consumers(0).committed(tp2).offset) - - // Using async should pick up the committed changes after commit completes - val commitCallback = new CountConsumerCommitCallback() - this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback) - awaitCommitCallback(this.consumers(0), commitCallback) - assertEquals(7, this.consumers(0).committed(tp2).offset) - } - - @Test - def testAutoOffsetReset() { - sendRecords(1) - this.consumers(0).assign(List(tp)) - consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) - } - - @Test - def testSeek() { - val consumer = this.consumers(0) - val totalRecords = 50L - sendRecords(totalRecords.toInt) - consumer.assign(List(tp)) - - consumer.seekToEnd(tp) - assertEquals(totalRecords, consumer.position(tp)) - assertFalse(consumer.poll(totalRecords).iterator().hasNext) - - consumer.seekToBeginning(tp) - assertEquals(0, consumer.position(tp), 0) - consumeRecords(consumer, numRecords = 1, startingOffset = 0) - - val mid = totalRecords / 2 - consumer.seek(tp, mid) - assertEquals(mid, consumer.position(tp)) - consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) - } - - @Test - def testGroupConsumption() { - sendRecords(10) - this.consumers(0).subscribe(List(topic)) - consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) - } - - - @Test - def testCommitMetadata() { - this.consumers(0).assign(List(tp)) - - // sync commit - val syncMetadata = new OffsetAndMetadata(5, "foo") - this.consumers(0).commitSync(Map((tp, syncMetadata))) - assertEquals(syncMetadata, this.consumers(0).committed(tp)) - - // async commit - val asyncMetadata = new OffsetAndMetadata(10, "bar") - val callback = new CountConsumerCommitCallback - this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback) - awaitCommitCallback(this.consumers(0), callback) - - assertEquals(asyncMetadata, this.consumers(0).committed(tp)) - } - - def testPositionAndCommit() { - sendRecords(5) - - // committed() on a partition with no committed offset throws an exception - intercept[NoOffsetForPartitionException] { - this.consumers(0).committed(new TopicPartition(topic, 15)) - } - - // position() on a partition that we aren't subscribed to throws an exception - intercept[IllegalArgumentException] { - this.consumers(0).position(new TopicPartition(topic, 15)) - } - - this.consumers(0).assign(List(tp)) - - assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp)) - this.consumers(0).commitSync() - assertEquals(0L, this.consumers(0).committed(tp).offset) - - consumeRecords(this.consumers(0), 5, 0) - assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp)) - this.consumers(0).commitSync() - assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset) - - sendRecords(1) - - // another consumer in the same group should get the same position - this.consumers(1).assign(List(tp)) - consumeRecords(this.consumers(1), 1, 5) - } - - @Test - def testPartitionsFor() { - val numParts = 2 - TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers) - val parts = this.consumers(0).partitionsFor("part-test") - assertNotNull(parts) - assertEquals(2, parts.size) - assertNull(this.consumers(0).partitionsFor("non-exist-topic")) - } - - @Test - def testListTopics() { - val numParts = 2 - val topic1: String = "part-test-topic-1" - val topic2: String = "part-test-topic-2" - val topic3: String = "part-test-topic-3" - TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers) - TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers) - TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers) - - val topics = this.consumers.head.listTopics() - assertNotNull(topics) - assertEquals(5, topics.size()) - assertEquals(5, topics.keySet().size()) - assertEquals(2, topics.get(topic1).size) - assertEquals(2, topics.get(topic2).size) - assertEquals(2, topics.get(topic3).size) - } - - @Test - def testPartitionReassignmentCallback() { - val listener = new TestConsumerReassignmentListener() - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - consumer0.subscribe(List(topic), listener) - - // the initial subscription should cause a callback execution - while(listener.callsToAssigned == 0) - consumer0.poll(50) - - // get metadata for the topic - var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala - while(parts == null) - parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala - assertEquals(1, parts.size) - assertNotNull(parts(0).leader()) - - // shutdown the coordinator - val coordinator = parts(0).leader().id() - this.servers(coordinator).shutdown() - - // this should cause another callback execution - while(listener.callsToAssigned < 2) - consumer0.poll(50) - - assertEquals(2, listener.callsToAssigned) - assertEquals(2, listener.callsToRevoked) - - consumer0.close() - } - - @Test - def testUnsubscribeTopic() { - - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - - try { - val listener = new TestConsumerReassignmentListener() - consumer0.subscribe(List(topic), listener) - - // the initial subscription should cause a callback execution - while (listener.callsToAssigned == 0) - consumer0.poll(50) - - consumer0.subscribe(List()) - assertEquals(0, consumer0.assignment.size()) - } finally { - consumer0.close() - } - } - - @Test - def testExpandingTopicSubscriptions() { - val otherTopic = "other" - val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) - this.consumers(0).subscribe(List(topic)) - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment == subscriptions.asJava - }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}") - - TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers) - this.consumers(0).subscribe(List(topic, otherTopic)) - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment == expandedSubscriptions.asJava - }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}") - } - - @Test - def testShrinkingTopicSubscriptions() { - val otherTopic = "other" - TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers) - val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) - val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - this.consumers(0).subscribe(List(topic, otherTopic)) - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment == subscriptions.asJava - }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}") - - this.consumers(0).subscribe(List(topic)) - TestUtils.waitUntilTrue(() => { - this.consumers(0).poll(50) - this.consumers(0).assignment == shrunkenSubscriptions.asJava - }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}") - } - - @Test - def testPartitionPauseAndResume() { - sendRecords(5) - this.consumers(0).assign(List(tp)) - consumeRecords(this.consumers(0), 5, 0) - this.consumers(0).pause(tp) - sendRecords(5) - assertTrue(this.consumers(0).poll(0).isEmpty) - this.consumers(0).resume(tp) - consumeRecords(this.consumers(0), 5, 5) - } - - @Test - def testPauseStateNotPreservedByRebalance() { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); - val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) - - sendRecords(5) - consumer0.subscribe(List(topic)) - consumeRecords(consumer0, 5, 0) - consumer0.pause(tp) - - // subscribe to a new topic to trigger a rebalance - consumer0.subscribe(List("topic2")) - - // after rebalance, our position should be reset and our pause state lost, - // so we should be able to consume from the beginning - consumeRecords(consumer0, 0, 5) - } - - private class TestConsumerReassignmentListener extends ConsumerRebalanceListener { - var callsToAssigned = 0 - var callsToRevoked = 0 - def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) { - info("onPartitionsAssigned called.") - callsToAssigned += 1 - } - def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) { - info("onPartitionsRevoked called.") - callsToRevoked += 1 - } - } - - private def sendRecords(numRecords: Int): Unit = { - sendRecords(numRecords, tp) - } - - private def sendRecords(numRecords: Int, tp: TopicPartition) { - val futures = (0 until numRecords).map { i => - this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes)) - } - futures.map(_.get) - } - - private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) { - val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 300 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) - records.add(record) - if(iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - for (i <- 0 until numRecords) { - val record = records.get(i) - val offset = startingOffset + i - assertEquals(topic, record.topic()) - assertEquals(part, record.partition()) - assertEquals(offset.toLong, record.offset()) - } - } - - private def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = { - val startCount = commitCallback.count - val started = System.currentTimeMillis() - while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000) - this.consumers(0).poll(50) - assertEquals(startCount + 1, commitCallback.count) - } - - private class CountConsumerCommitCallback extends OffsetCommitCallback { - var count = 0 - - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1 - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 77fcd8b..2ec59fb 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -23,7 +23,7 @@ import kafka.utils.TestUtils import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer -import kafka.server.{OffsetManager, KafkaConfig} +import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness import org.junit.{After, Before} import scala.collection.mutable.Buffer @@ -41,11 +41,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { lazy val consumerConfig = new Properties lazy val serverConfig = new Properties - var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() - var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() override def generateConfigs() = { - val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect) + val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile) cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala new file mode 100644 index 0000000..335d585 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -0,0 +1,15 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +class PlaintextConsumerTest extends BaseConsumerTest http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala new file mode 100644 index 0000000..d017d13 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.util.Properties + +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer} +import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Test + +class PlaintextProducerSendTest extends BaseProducerSendTest { + + @Test + def testSerializerConstructors() { + try { + createNewProducerWithNoSerializer(brokerList) + fail("Instantiating a producer without specifying a serializer should cause a ConfigException") + } catch { + case ce : ConfigException => // this is ok + } + + // create a producer with explicit serializers should succeed + createNewProducerWithExplicitSerializer(brokerList) + } + + private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + } + + private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala deleted file mode 100644 index 3aef172..0000000 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ /dev/null @@ -1,416 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.util.Properties -import java.util.concurrent.TimeUnit - -import kafka.consumer.SimpleConsumer -import kafka.integration.KafkaServerTestHarness -import kafka.message.Message -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.junit.Assert._ -import org.junit.{After, Before, Test} - -class ProducerSendTest extends KafkaServerTestHarness { - val numServers = 2 - - val overridingProps = new Properties() - overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) - - def generateConfigs() = - TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) - - private var consumer1: SimpleConsumer = null - private var consumer2: SimpleConsumer = null - - private val topic = "topic" - private val numRecords = 100 - - @Before - override def setUp() { - super.setUp() - - // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") - } - - @After - override def tearDown() { - consumer1.close() - consumer2.close() - - super.tearDown() - } - - /** - * testSendOffset checks the basic send API behavior - * - * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. - * 2. Last message of the non-blocking send should return the correct offset metadata - */ - @Test - def testSendOffset() { - var producer = TestUtils.createNewProducer(brokerList) - val partition = new Integer(0) - - object callback extends Callback { - var offset = 0L - def onCompletion(metadata: RecordMetadata, exception: Exception) { - if (exception == null) { - assertEquals(offset, metadata.offset()) - assertEquals(topic, metadata.topic()) - assertEquals(partition, metadata.partition()) - offset += 1 - } else { - fail("Send callback returns the following exception", exception) - } - } - } - - try { - // create topic - TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - - // send a normal record - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) - assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) - - // send a record with null value should be ok - val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) - assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) - - // send a record with null key should be ok - val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) - assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) - - // send a record with null part id should be ok - val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) - assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) - - // send a record with null topic should fail - try { - val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) - producer.send(record4, callback) - fail("Should not allow sending a record without topic") - } catch { - case iae: IllegalArgumentException => // this is ok - case e: Throwable => fail("Only expecting IllegalArgumentException", e) - } - - // non-blocking send a list of records - for (i <- 1 to numRecords) - producer.send(record0, callback) - - // check that all messages have been acked via offset - assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) - - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } - - @Test - def testSerializer() { - // send a record with a wrong type should receive a serialization exception - try { - val producer = createNewProducerWithWrongSerializer(brokerList) - val record5 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) - producer.send(record5) - fail("Should have gotten a SerializationException") - } catch { - case se: SerializationException => // this is ok - } - - try { - createNewProducerWithNoSerializer(brokerList) - fail("Instantiating a producer without specifying a serializer should cause a ConfigException") - } catch { - case ce : ConfigException => // this is ok - } - - // create a producer with explicit serializers should succeed - createNewProducerWithExplicitSerializer(brokerList) - } - - private def createNewProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { - import org.apache.kafka.clients.producer.ProducerConfig - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) - } - - private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { - import org.apache.kafka.clients.producer.ProducerConfig - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) - } - - private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { - import org.apache.kafka.clients.producer.ProducerConfig - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer) - } - - /** - * testClose checks the closing behavior - * - * After close() returns, all messages should be sent with correct returned offset metadata - */ - @Test - def testClose() { - var producer = TestUtils.createNewProducer(brokerList) - - try { - // create topic - TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - - // non-blocking send a list of records - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) - for (i <- 1 to numRecords) - producer.send(record0) - val response0 = producer.send(record0) - - // close the producer - producer.close() - producer = null - - // check that all messages have been acked via offset, - // this also checks that messages with same key go to the same partition - assertTrue("The last message should be acked before producer is shutdown", response0.isDone) - assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) - - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } - - /** - * testSendToPartition checks the partitioning behavior - * - * The specified partition-id should be respected - */ - @Test - def testSendToPartition() { - var producer = TestUtils.createNewProducer(brokerList) - - try { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) - val partition = 1 - - // make sure leaders exist - val leader1 = leaders(partition) - assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) - - val responses = - for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) - val futures = responses.toList - futures.map(_.get) - for (future <- futures) - assertTrue("Request should have completed", future.isDone) - - // make sure all of them end up in the same partition with increasing offset values - for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset.toLong, future.get.offset) - assertEquals(topic, future.get.topic) - assertEquals(partition, future.get.partition) - } - - // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if(leader1.get == configs(0).brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - } - val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer - assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) - - // TODO: also check topic and partition after they are added in the return messageSet - for (i <- 0 to numRecords - 1) { - assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) - assertEquals(i.toLong, messageSet1(i).offset) - } - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } - - /** - * testAutoCreateTopic - * - * The topic should be created upon sending the first message - */ - @Test - def testAutoCreateTopic() { - var producer = TestUtils.createNewProducer(brokerList, retries = 5) - - try { - // Send a message to auto-create the topic - val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) - assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) - - // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) - - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } - - /** - * Test that flush immediately sends all accumulated requests. - */ - @Test - def testFlush() { - var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) - try { - TestUtils.createTopic(zkUtils, topic, 2, 2, servers) - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) - for(i <- 0 until 50) { - val responses = (0 until numRecords) map (i => producer.send(record)) - assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.flush() - assertTrue("All requests are complete.", responses.forall(_.isDone())) - } - } finally { - if (producer != null) - producer.close() - } - } - - /** - * Test close with zero timeout from caller thread - */ - @Test - def testCloseWithZeroTimeoutFromCallerThread() { - var producer: KafkaProducer[Array[Byte],Array[Byte]] = null - try { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) - val leader0 = leaders(0) - val leader1 = leaders(1) - - // create record - val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) - val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) - - // Test closing from caller thread. - for(i <- 0 until 50) { - producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) - val responses = (0 until numRecords) map (i => producer.send(record0)) - assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.close(0, TimeUnit.MILLISECONDS) - responses.foreach { future => - try { - future.get() - fail("No message should be sent successfully.") - } catch { - case e: Exception => - assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) - } - } - val fetchResponse = if (leader0.get == configs(0).brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } - assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) - } - } finally { - if (producer != null) - producer.close() - } - } - - /** - * Test close with zero and non-zero timeout from sender thread - */ - @Test - def testCloseWithZeroTimeoutFromSenderThread() { - var producer: KafkaProducer[Array[Byte],Array[Byte]] = null - try { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - val leader = leaders(0) - - // create record - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) - - // Test closing from sender thread. - class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { - override def onCompletion(metadata: RecordMetadata, exception: Exception) { - // Trigger another batch in accumulator before close the producer. These messages should - // not be sent. - (0 until numRecords) map (i => producer.send(record)) - // The close call will be called by all the message callbacks. This tests idempotence of the close call. - producer.close(0, TimeUnit.MILLISECONDS) - // Test close with non zero timeout. Should not block at all. - producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) - } - } - for(i <- 0 until 50) { - producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) - // send message to partition 0 - val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))) - assertTrue("No request is complete.", responses.forall(!_.isDone())) - // flush the messages. - producer.flush() - assertTrue("All request are complete.", responses.forall(_.isDone())) - // Check the messages received by broker. - val fetchResponse = if (leader.get == configs(0).brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) - } - val expectedNumRecords = (i + 1) * numRecords - assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords), - expectedNumRecords, fetchResponse.messageSet(topic, 0).size) - } - } finally { - if (producer != null) - producer.close() - } - } -}