http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
new file mode 100644
index 0000000..84bebef
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import java.util.regex.Pattern
+import java.util
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+
+import kafka.utils.{TestUtils, Logging}
+import kafka.server.KafkaConfig
+
+import java.util.ArrayList
+import org.junit.Assert._
+import org.junit.{Test, Before}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import kafka.coordinator.ConsumerCoordinator
+
+/**
+ * Integration tests for the new consumer that cover basic usage as well as 
server failures
+ */
+abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
+
+  // configure the servers and clients
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, 
"false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"3") // don't want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, 
"100") // set small enough session timeout
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"100")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @Test
+  def testSimpleConsumption() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    assertEquals(0, this.consumers(0).assignment.size)
+    this.consumers(0).assign(List(tp))
+    assertEquals(1, this.consumers(0).assignment.size)
+    
+    this.consumers(0).seek(tp, 0)
+    consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, 
startingOffset = 0)
+
+    // check async commit callbacks
+    val commitCallback = new CountConsumerCommitCallback()
+    this.consumers(0).commitAsync(commitCallback)
+
+    // shouldn't make progress until poll is invoked
+    Thread.sleep(10)
+    assertEquals(0, commitCallback.count)
+    awaitCommitCallback(this.consumers(0), commitCallback)
+  }
+
+  @Test
+  def testAutoCommitOnClose() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoCommitOnCloseAfterWakeup() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+
+    // wakeup the consumer before closing to simulate trying to break a poll
+    // loop from another thread
+    consumer0.wakeup()
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoCommitOnRebalance() {
+    val topic2 = "topic2"
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
+
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+
+    // change subscription to trigger rebalance
+    consumer0.subscribe(List(topic, topic2))
+
+    val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new 
TopicPartition(topic2, 1))
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == newAssignment.asJava
+    }, s"Expected partitions ${newAssignment.asJava} but actually got 
${consumer0.assignment()}")
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(300, consumer0.committed(tp).offset)
+    assertEquals(500, consumer0.committed(tp2).offset)
+  }
+
+  @Test
+  def testPatternSubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1: String = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    val topic2: String = "tblablak" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic2, 0))
+    sendRecords(1000, new TopicPartition(topic2, 1))
+
+    val topic3: String = "tblab1" // does not match subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic3, 0))
+    sendRecords(1000, new TopicPartition(topic3, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    val pattern: Pattern = Pattern.compile("t.*c")
+    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    var subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
+
+    val topic4: String = "tsomec" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic4, 0))
+    sendRecords(1000, new TopicPartition(topic4, 1))
+
+    subscriptions = subscriptions ++ Set(
+      new TopicPartition(topic4, 0),
+      new TopicPartition(topic4, 1))
+
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
+  def testPatternUnsubscription() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    val topic1: String = "tblablac" // matches subscribed pattern
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
+    sendRecords(1000, new TopicPartition(topic1, 0))
+    sendRecords(1000, new TopicPartition(topic1, 1))
+
+    assertEquals(0, this.consumers(0).assignment().size)
+
+    this.consumers(0).subscribe(Pattern.compile("t.*c"), new 
TestConsumerReassignmentListener)
+    this.consumers(0).poll(50)
+
+    val subscriptions = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
+
+    this.consumers(0).unsubscribe()
+    assertEquals(0, this.consumers(0).assignment().size)
+  }
+
+  @Test
+  def testCommitSpecifiedOffsets() {
+    sendRecords(5, tp)
+    sendRecords(7, tp2)
+
+    this.consumers(0).assign(List(tp, tp2));
+
+    // Need to poll to join the group
+    this.consumers(0).poll(50)
+    val pos1 = this.consumers(0).position(tp)
+    val pos2 = this.consumers(0).position(tp2)
+    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, 
new OffsetAndMetadata(3L))).asJava)
+    assertEquals(3, this.consumers(0).committed(tp).offset)
+    assertNull(this.consumers(0).committed(tp2))
+
+    // positions should not change
+    assertEquals(pos1, this.consumers(0).position(tp))
+    assertEquals(pos2, this.consumers(0).position(tp2))
+    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(5L))).asJava)
+    assertEquals(3, this.consumers(0).committed(tp).offset)
+    assertEquals(5, this.consumers(0).committed(tp2).offset)
+
+    // Using async should pick up the committed changes after commit completes
+    val commitCallback = new CountConsumerCommitCallback()
+    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(7L))).asJava, commitCallback)
+    awaitCommitCallback(this.consumers(0), commitCallback)
+    assertEquals(7, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).assign(List(tp))
+    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset 
= 0)
+  }
+
+  @Test
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.assign(List(tp))
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt)
+  }
+
+  @Test
+  def testGroupConsumption() {
+    sendRecords(10)
+    this.consumers(0).subscribe(List(topic))
+    consumeAndVerifyRecords(this.consumers(0), numRecords = 1, startingOffset 
= 0)
+  }
+
+
+  @Test
+  def testCommitMetadata() {
+    this.consumers(0).assign(List(tp))
+
+    // sync commit
+    val syncMetadata = new OffsetAndMetadata(5, "foo")
+    this.consumers(0).commitSync(Map((tp, syncMetadata)))
+    assertEquals(syncMetadata, this.consumers(0).committed(tp))
+
+    // async commit
+    val asyncMetadata = new OffsetAndMetadata(10, "bar")
+    val callback = new CountConsumerCommitCallback
+    this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback)
+    awaitCommitCallback(this.consumers(0), callback)
+
+    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+  }
+
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an 
exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).assign(List(tp))
+
+    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commitSync()
+    assertEquals(0L, this.consumers(0).committed(tp).offset)
+
+    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers(0).position(tp))
+    this.consumers(0).commitSync()
+    assertEquals("Committed offset should be returned", 5L, 
this.consumers(0).committed(tp).offset)
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).assign(List(tp))
+    consumeAndVerifyRecords(this.consumers(1), 1, 5)
+  }
+
+  @Test
+  def testPartitionsFor() {
+    val numParts = 2
+    TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
+    val parts = this.consumers(0).partitionsFor("part-test")
+    assertNotNull(parts)
+    assertEquals(2, parts.size)
+    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  @Test
+  def testListTopics() {
+    val numParts = 2
+    val topic1: String = "part-test-topic-1"
+    val topic2: String = "part-test-topic-2"
+    val topic3: String = "part-test-topic-3"
+    TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
+
+    val topics = this.consumers.head.listTopics()
+    assertNotNull(topics)
+    assertEquals(5, topics.size())
+    assertEquals(5, topics.keySet().size())
+    assertEquals(2, topics.get(topic1).size)
+    assertEquals(2, topics.get(topic2).size)
+    assertEquals(2, topics.get(topic3).size)
+  }
+
+  @Test
+  def testPartitionReassignmentCallback() {
+    val listener = new TestConsumerReassignmentListener()
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer0.subscribe(List(topic), listener)
+        
+    // the initial subscription should cause a callback execution
+    while(listener.callsToAssigned == 0)
+      consumer0.poll(50)
+    
+    // get metadata for the topic
+    var parts = 
consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+    while(parts == null)
+      parts = 
consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+    assertEquals(1, parts.size)
+    assertNotNull(parts(0).leader())
+    
+    // shutdown the coordinator
+    val coordinator = parts(0).leader().id()
+    this.servers(coordinator).shutdown()
+    
+    // this should cause another callback execution
+    while(listener.callsToAssigned < 2)
+      consumer0.poll(50)
+
+    assertEquals(2, listener.callsToAssigned)
+    assertEquals(2, listener.callsToRevoked)
+
+    consumer0.close()
+  }
+
+  @Test
+  def testUnsubscribeTopic() {
+
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    try {
+      val listener = new TestConsumerReassignmentListener()
+      consumer0.subscribe(List(topic), listener)
+
+      // the initial subscription should cause a callback execution
+      while (listener.callsToAssigned == 0)
+        consumer0.poll(50)
+
+      consumer0.subscribe(List())
+      assertEquals(0, consumer0.assignment.size())
+    } finally {
+      consumer0.close()
+    }
+  }
+
+  @Test
+  def testExpandingTopicSubscriptions() {
+    val otherTopic = "other"
+    val subscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
+    val expandedSubscriptions = subscriptions ++ Set(new 
TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    this.consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
+
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, 
this.servers)
+    this.consumers(0).subscribe(List(topic, otherTopic))
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
+  }
+
+  @Test
+  def testShrinkingTopicSubscriptions() {
+    val otherTopic = "other"
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, 
this.servers)
+    val subscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new 
TopicPartition(otherTopic, 1))
+    val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
+    this.consumers(0).subscribe(List(topic, otherTopic))
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
+
+    this.consumers(0).subscribe(List(topic))
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).assignment == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
+  }
+
+  @Test
+  def testPartitionPauseAndResume() {
+    sendRecords(5)
+    this.consumers(0).assign(List(tp))
+    consumeAndVerifyRecords(this.consumers(0), 5, 0)
+    this.consumers(0).pause(tp)
+    sendRecords(5)
+    assertTrue(this.consumers(0).poll(0).isEmpty)
+    this.consumers(0).resume(tp)
+    consumeAndVerifyRecords(this.consumers(0), 5, 5)
+  }
+
+  @Test
+  def testPauseStateNotPreservedByRebalance() {
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
+    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    sendRecords(5)
+    consumer0.subscribe(List(topic))
+    consumeAndVerifyRecords(consumer0, 5, 0)
+    consumer0.pause(tp)
+
+    // subscribe to a new topic to trigger a rebalance
+    consumer0.subscribe(List("topic2"))
+
+    // after rebalance, our position should be reset and our pause state lost,
+    // so we should be able to consume from the beginning
+    consumeAndVerifyRecords(consumer0, 0, 5)
+  }
+
+  private class TestConsumerReassignmentListener extends 
ConsumerRebalanceListener {
+    var callsToAssigned = 0
+    var callsToRevoked = 0
+    def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) 
{
+      info("onPartitionsAssigned called.")
+      callsToAssigned += 1
+    }
+    def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsRevoked called.")
+      callsToRevoked += 1
+    }
+  }
+
+  private def sendRecords(numRecords: Int): Unit = {
+    sendRecords(numRecords, tp)
+  }
+
+  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+    (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), 
s"key $i".getBytes, s"value $i".getBytes))
+    }.foreach(_.get)
+  }
+
+  private def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], 
Array[Byte]], numRecords: Int, startingOffset: Int,
+                                      startingKeyAndValueIndex: Int = 0) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala)
+        records.add(record)
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+      val keyAndValueIndex = startingKeyAndValueIndex + i
+      assertEquals(s"key $keyAndValueIndex", new String(record.key()))
+      assertEquals(s"value $keyAndValueIndex", new String(record.value()))
+    }
+  }
+
+  private def awaitCommitCallback(consumer: Consumer[Array[Byte], 
Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+    val startCount = commitCallback.count
+    val started = System.currentTimeMillis()
+    while (commitCallback.count == startCount && System.currentTimeMillis() - 
started < 10000)
+      this.consumers(0).poll(50)
+    assertEquals(startCount + 1, commitCallback.count)
+  }
+
+  private class CountConsumerCommitCallback extends OffsetCommitCallback {
+    var count = 0
+
+    override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = count += 1
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
new file mode 100644
index 0000000..92c93e6
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.io.File
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.message.Message
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.errors.SerializationException
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+abstract class BaseProducerSendTest extends KafkaServerTestHarness {
+
+  def generateConfigs = {
+    val overridingProps = new Properties()
+    val numServers = 2
+    overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, 
overridingProps))
+  }
+
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+
+  private val topic = "topic"
+  private val numRecords = 100
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    // TODO: we need to migrate to new consumers when 0.9 is final
+    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 
1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 
1024*1024, "")
+  }
+
+  @After
+  override def tearDown() {
+    consumer1.close()
+    consumer2.close()
+
+    super.tearDown()
+  }
+
+  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: 
Long = 0, props: Option[Properties] = None): 
KafkaProducer[Array[Byte],Array[Byte]] =
+    TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
+      retries = retries, lingerMs = lingerMs, props = props)
+
+  /**
+   * testSendOffset checks the basic send API behavior
+   *
+   * 1. Send with null key/value/partition-id should be accepted; send with 
null topic should be rejected.
+   * 2. Last message of the non-blocking send should return the correct offset 
metadata
+   */
+  @Test
+  def testSendOffset() {
+    val producer = createProducer(brokerList)
+    val partition = new Integer(0)
+
+    object callback extends Callback {
+      var offset = 0L
+      def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        if (exception == null) {
+          assertEquals(offset, metadata.offset())
+          assertEquals(topic, metadata.topic())
+          assertEquals(partition, metadata.partition())
+          offset += 1
+        } else {
+          fail("Send callback returns the following exception", exception)
+        }
+      }
+    }
+
+    try {
+      // create topic
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+
+      // send a normal record
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 0", 0L, producer.send(record0, 
callback).get.offset)
+
+      // send a record with null value should be ok
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, null)
+      assertEquals("Should have offset 1", 1L, producer.send(record1, 
callback).get.offset)
+
+      // send a record with null key should be ok
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, "value".getBytes)
+      assertEquals("Should have offset 2", 2L, producer.send(record2, 
callback).get.offset)
+
+      // send a record with null part id should be ok
+      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 3", 3L, producer.send(record3, 
callback).get.offset)
+
+      // send a record with null topic should fail
+      try {
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, 
partition, "key".getBytes, "value".getBytes)
+        producer.send(record4, callback)
+        fail("Should not allow sending a record without topic")
+      } catch {
+        case iae: IllegalArgumentException => // this is ok
+        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+      }
+
+      // non-blocking send a list of records
+      for (i <- 1 to numRecords)
+        producer.send(record0, callback)
+
+      // check that all messages have been acked via offset
+      assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, 
producer.send(record0, callback).get.offset)
+
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  @Test
+  def testWrongSerializer() {
+    // send a record with a wrong type should receive a serialization exception
+    try {
+      val producer = createProducerWithWrongSerializer(brokerList)
+      val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new 
Integer(0), "key".getBytes, "value".getBytes)
+      producer.send(record5)
+      fail("Should have gotten a SerializationException")
+    } catch {
+      case se: SerializationException => // this is ok
+    }
+  }
+
+  private def createProducerWithWrongSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+    createProducer(brokerList, props = Some(producerProps))
+  }
+
+  /**
+   * testClose checks the closing behavior
+   *
+   * After close() returns, all messages should be sent with correct returned 
offset metadata
+   */
+  @Test
+  def testClose() {
+    val producer = createProducer(brokerList)
+
+    try {
+      // create topic
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+
+      // non-blocking send a list of records
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
+      for (i <- 1 to numRecords)
+        producer.send(record0)
+      val response0 = producer.send(record0)
+
+      // close the producer
+      producer.close()
+
+      // check that all messages have been acked via offset,
+      // this also checks that messages with same key go to the same partition
+      assertTrue("The last message should be acked before producer is 
shutdown", response0.isDone)
+      assertEquals("Should have offset " + numRecords, numRecords.toLong, 
response0.get.offset)
+
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  /**
+   * testSendToPartition checks the partitioning behavior
+   *
+   * The specified partition-id should be respected
+   */
+  @Test
+  def testSendToPartition() {
+    val producer = createProducer(brokerList)
+
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+      val partition = 1
+
+      // make sure leaders exist
+      val leader1 = leaders(partition)
+      assertTrue("Leader for topic \"topic\" partition 1 should exist", 
leader1.isDefined)
+
+      val responses =
+        for (i <- 1 to numRecords)
+          yield producer.send(new 
ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + 
i).getBytes))
+      val futures = responses.toList
+      futures.foreach(_.get)
+      for (future <- futures)
+        assertTrue("Request should have completed", future.isDone)
+
+      // make sure all of them end up in the same partition with increasing 
offset values
+      for ((future, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset.toLong, future.get.offset)
+        assertEquals(topic, future.get.topic)
+        assertEquals(partition, future.get.partition)
+      }
+
+      // make sure the fetched messages also respect the partitioning and 
ordering
+      val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
+      }
+      val messageSet1 = fetchResponse1.messageSet(topic, 
partition).iterator.toBuffer
+      assertEquals("Should have fetched " + numRecords + " messages", 
numRecords, messageSet1.size)
+
+      // TODO: also check topic and partition after they are added in the 
return messageSet
+      for (i <- 0 to numRecords - 1) {
+        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), 
messageSet1(i).message)
+        assertEquals(i.toLong, messageSet1(i).offset)
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  /**
+   * testAutoCreateTopic
+   *
+   * The topic should be created upon sending the first message
+   */
+  @Test
+  def testAutoCreateTopic() {
+    val producer = createProducer(brokerList, retries = 5)
+
+    try {
+      // Send a message to auto-create the topic
+      val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 0", 0L, 
producer.send(record).get.offset)
+
+      // double check that the topic is created with leader elected
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+
+    } finally {
+      if (producer != null) {
+        producer.close()
+      }
+    }
+  }
+
+  /**
+   * Test that flush immediately sends all accumulated requests.
+   */
+  @Test
+  def testFlush() {
+    val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+    try {
+      TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"value".getBytes)
+      for(i <- 0 until 50) {
+        val responses = (0 until numRecords) map (i => producer.send(record))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        producer.flush()
+        assertTrue("All requests are complete.", responses.forall(_.isDone()))
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  /**
+   * Test close with zero timeout from caller thread
+   */
+  @Test
+  def testCloseWithZeroTimeoutFromCallerThread() {
+    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+      val leader0 = leaders(0)
+      val leader1 = leaders(1)
+
+      // create record
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
null, "value".getBytes)
+      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, 
null, "value".getBytes)
+
+      // Test closing from caller thread.
+      for(i <- 0 until 50) {
+        producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+        val responses = (0 until numRecords) map (i => producer.send(record0))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        producer.close(0, TimeUnit.MILLISECONDS)
+        responses.foreach { future =>
+          try {
+            future.get()
+            fail("No message should be sent successfully.")
+          } catch {
+            case e: Exception =>
+              assertEquals("java.lang.IllegalStateException: Producer is 
closed forcefully.", e.getMessage)
+          }
+        }
+        val fetchResponse = if (leader0.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
+        }
+        assertEquals("Fetch response should have no message returned.", 0, 
fetchResponse.messageSet(topic, 0).size)
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  /**
+   * Test close with zero and non-zero timeout from sender thread
+   */
+  @Test
+  def testCloseWithZeroTimeoutFromSenderThread() {
+    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+      val leader = leaders(0)
+
+      // create record
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
null, "value".getBytes)
+
+      // Test closing from sender thread.
+      class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) 
extends Callback {
+        override def onCompletion(metadata: RecordMetadata, exception: 
Exception) {
+          // Trigger another batch in accumulator before close the producer. 
These messages should
+          // not be sent.
+          (0 until numRecords) map (i => producer.send(record))
+          // The close call will be called by all the message callbacks. This 
tests idempotence of the close call.
+          producer.close(0, TimeUnit.MILLISECONDS)
+          // Test close with non zero timeout. Should not block at all.
+          producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
+        }
+      }
+      for(i <- 0 until 50) {
+        producer = createProducer(brokerList, lingerMs = Long.MaxValue)
+        // send message to partition 0
+        val responses = ((0 until numRecords) map (i => producer.send(record, 
new CloseCallback(producer))))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        // flush the messages.
+        producer.flush()
+        assertTrue("All request are complete.", responses.forall(_.isDone()))
+        // Check the messages received by broker.
+        val fetchResponse = if (leader.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
+        }
+        val expectedNumRecords = (i + 1) * numRecords
+        assertEquals("Fetch response to partition 0 should have %d 
messages.".format(expectedNumRecords),
+          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
deleted file mode 100644
index d973d9a..0000000
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ /dev/null
@@ -1,594 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import java.util.regex.Pattern
-import java.{lang, util}
-
-import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.TopicPartition
-
-import kafka.utils.{TestUtils, Logging}
-import kafka.server.KafkaConfig
-
-import java.util.ArrayList
-import org.junit.Assert._
-import org.junit.{Test, Before}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import kafka.coordinator.ConsumerCoordinator
-
-
-/**
- * Integration tests for the new consumer that cover basic usage as well as 
server failures
- */
-class ConsumerTest extends IntegrationTestHarness with Logging {
-
-  val producerCount = 1
-  val consumerCount = 2
-  val serverCount = 3
-
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
-  val part2 = 1
-  val tp2 = new TopicPartition(topic, part2)
-
-  // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, 
"false") // speed up shutdown
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"3") // don't want to lose offset
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, 
"100") // set small enough session timeout
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
-  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
-  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"100")
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
-  }
-
-  @Test
-  def testSimpleConsumption() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    assertEquals(0, this.consumers(0).assignment.size)
-    this.consumers(0).assign(List(tp))
-    assertEquals(1, this.consumers(0).assignment.size)
-    
-    this.consumers(0).seek(tp, 0)
-    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset 
= 0)
-
-    // check async commit callbacks
-    val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(commitCallback)
-
-    // shouldn't make progress until poll is invoked
-    Thread.sleep(10)
-    assertEquals(0, commitCallback.count)
-    awaitCommitCallback(this.consumers(0), commitCallback)
-  }
-
-  @Test
-  def testAutoCommitOnClose() {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    consumer0.subscribe(List(topic))
-
-    val assignment = Set(tp, tp2)
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
-
-    // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-    consumer0.close()
-
-    // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
-  }
-
-  @Test
-  def testAutoCommitOnCloseAfterWakeup() {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    consumer0.subscribe(List(topic))
-
-    val assignment = Set(tp, tp2)
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
-
-    // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-
-    // wakeup the consumer before closing to simulate trying to break a poll
-    // loop from another thread
-    consumer0.wakeup()
-    consumer0.close()
-
-    // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers(0).committed(tp).offset)
-    assertEquals(500, this.consumers(0).committed(tp2).offset)
-  }
-
-  @Test
-  def testAutoCommitOnRebalance() {
-    val topic2 = "topic2"
-    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
-
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    consumer0.subscribe(List(topic))
-
-    val assignment = Set(tp, tp2)
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got 
${consumer0.assignment()}")
-
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-
-    // change subscription to trigger rebalance
-    consumer0.subscribe(List(topic, topic2))
-
-    val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new 
TopicPartition(topic2, 1))
-    TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == newAssignment.asJava
-    }, s"Expected partitions ${newAssignment.asJava} but actually got 
${consumer0.assignment()}")
-
-    // after rebalancing, we should have reset to the committed positions
-    assertEquals(300, consumer0.committed(tp).offset)
-    assertEquals(500, consumer0.committed(tp2).offset)
-  }
-
-  @Test
-  def testPatternSubscription() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
-
-    val topic2: String = "tblablak" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic2, 0))
-    sendRecords(1000, new TopicPartition(topic2, 1))
-
-    val topic3: String = "tblab1" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic3, 0))
-    sendRecords(1000, new TopicPartition(topic3, 1))
-
-    assertEquals(0, this.consumers(0).assignment().size)
-
-    val pattern: Pattern = Pattern.compile("t.*c")
-    this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
-
-    var subscriptions = Set(
-      new TopicPartition(topic, 0),
-      new TopicPartition(topic, 1),
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic1, 1))
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
-
-    val topic4: String = "tsomec" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic4, 0))
-    sendRecords(1000, new TopicPartition(topic4, 1))
-
-    subscriptions = subscriptions ++ Set(
-      new TopicPartition(topic4, 0),
-      new TopicPartition(topic4, 1))
-
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
-
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
-  }
-
-  @Test
-  def testPatternUnsubscription() {
-    val numRecords = 10000
-    sendRecords(numRecords)
-
-    val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
-
-    assertEquals(0, this.consumers(0).assignment().size)
-
-    this.consumers(0).subscribe(Pattern.compile("t.*c"), new 
TestConsumerReassignmentListener)
-    this.consumers(0).poll(50)
-
-    val subscriptions = Set(
-      new TopicPartition(topic, 0),
-      new TopicPartition(topic, 1),
-      new TopicPartition(topic1, 0),
-      new TopicPartition(topic1, 1))
-
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment()}")
-
-    this.consumers(0).unsubscribe()
-    assertEquals(0, this.consumers(0).assignment().size)
-  }
-
-  @Test
-  def testCommitSpecifiedOffsets() {
-    sendRecords(5, tp)
-    sendRecords(7, tp2)
-
-    this.consumers(0).assign(List(tp, tp2));
-
-    // Need to poll to join the group
-    this.consumers(0).poll(50)
-    val pos1 = this.consumers(0).position(tp)
-    val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, 
new OffsetAndMetadata(3L))).asJava)
-    assertEquals(3, this.consumers(0).committed(tp).offset)
-    assertNull(this.consumers(0).committed(tp2))
-
-    // positions should not change
-    assertEquals(pos1, this.consumers(0).position(tp))
-    assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(5L))).asJava)
-    assertEquals(3, this.consumers(0).committed(tp).offset)
-    assertEquals(5, this.consumers(0).committed(tp2).offset)
-
-    // Using async should pick up the committed changes after commit completes
-    val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(7L))).asJava, commitCallback)
-    awaitCommitCallback(this.consumers(0), commitCallback)
-    assertEquals(7, this.consumers(0).committed(tp2).offset)
-  }
-
-  @Test
-  def testAutoOffsetReset() {
-    sendRecords(1)
-    this.consumers(0).assign(List(tp))
-    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-  @Test
-  def testSeek() {
-    val consumer = this.consumers(0)
-    val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
-    consumer.assign(List(tp))
-
-    consumer.seekToEnd(tp)
-    assertEquals(totalRecords, consumer.position(tp))
-    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
-
-    consumer.seekToBeginning(tp)
-    assertEquals(0, consumer.position(tp), 0)
-    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
-
-    val mid = totalRecords / 2
-    consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
-    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
-  }
-
-  @Test
-  def testGroupConsumption() {
-    sendRecords(10)
-    this.consumers(0).subscribe(List(topic))
-    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
-  }
-
-
-  @Test
-  def testCommitMetadata() {
-    this.consumers(0).assign(List(tp))
-
-    // sync commit
-    val syncMetadata = new OffsetAndMetadata(5, "foo")
-    this.consumers(0).commitSync(Map((tp, syncMetadata)))
-    assertEquals(syncMetadata, this.consumers(0).committed(tp))
-
-    // async commit
-    val asyncMetadata = new OffsetAndMetadata(10, "bar")
-    val callback = new CountConsumerCommitCallback
-    this.consumers(0).commitAsync(Map((tp, asyncMetadata)), callback)
-    awaitCommitCallback(this.consumers(0), callback)
-
-    assertEquals(asyncMetadata, this.consumers(0).committed(tp))
-  }
-
-  def testPositionAndCommit() {
-    sendRecords(5)
-
-    // committed() on a partition with no committed offset throws an exception
-    intercept[NoOffsetForPartitionException] {
-      this.consumers(0).committed(new TopicPartition(topic, 15))
-    }
-
-    // position() on a partition that we aren't subscribed to throws an 
exception
-    intercept[IllegalArgumentException] {
-      this.consumers(0).position(new TopicPartition(topic, 15))
-    }
-
-    this.consumers(0).assign(List(tp))
-
-    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals(0L, this.consumers(0).committed(tp).offset)
-
-    consumeRecords(this.consumers(0), 5, 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers(0).position(tp))
-    this.consumers(0).commitSync()
-    assertEquals("Committed offset should be returned", 5L, 
this.consumers(0).committed(tp).offset)
-
-    sendRecords(1)
-
-    // another consumer in the same group should get the same position
-    this.consumers(1).assign(List(tp))
-    consumeRecords(this.consumers(1), 1, 5)
-  }
-
-  @Test
-  def testPartitionsFor() {
-    val numParts = 2
-    TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
-    val parts = this.consumers(0).partitionsFor("part-test")
-    assertNotNull(parts)
-    assertEquals(2, parts.size)
-    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
-  }
-
-  @Test
-  def testListTopics() {
-    val numParts = 2
-    val topic1: String = "part-test-topic-1"
-    val topic2: String = "part-test-topic-2"
-    val topic3: String = "part-test-topic-3"
-    TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
-    TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers)
-    TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
-
-    val topics = this.consumers.head.listTopics()
-    assertNotNull(topics)
-    assertEquals(5, topics.size())
-    assertEquals(5, topics.keySet().size())
-    assertEquals(2, topics.get(topic1).size)
-    assertEquals(2, topics.get(topic2).size)
-    assertEquals(2, topics.get(topic3).size)
-  }
-
-  @Test
-  def testPartitionReassignmentCallback() {
-    val listener = new TestConsumerReassignmentListener()
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumer0.subscribe(List(topic), listener)
-        
-    // the initial subscription should cause a callback execution
-    while(listener.callsToAssigned == 0)
-      consumer0.poll(50)
-    
-    // get metadata for the topic
-    var parts = 
consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
-    while(parts == null)
-      parts = 
consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
-    assertEquals(1, parts.size)
-    assertNotNull(parts(0).leader())
-    
-    // shutdown the coordinator
-    val coordinator = parts(0).leader().id()
-    this.servers(coordinator).shutdown()
-    
-    // this should cause another callback execution
-    while(listener.callsToAssigned < 2)
-      consumer0.poll(50)
-
-    assertEquals(2, listener.callsToAssigned)
-    assertEquals(2, listener.callsToRevoked)
-
-    consumer0.close()
-  }
-
-  @Test
-  def testUnsubscribeTopic() {
-
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    try {
-      val listener = new TestConsumerReassignmentListener()
-      consumer0.subscribe(List(topic), listener)
-
-      // the initial subscription should cause a callback execution
-      while (listener.callsToAssigned == 0)
-        consumer0.poll(50)
-
-      consumer0.subscribe(List())
-      assertEquals(0, consumer0.assignment.size())
-    } finally {
-      consumer0.close()
-    }
-  }
-
-  @Test
-  def testExpandingTopicSubscriptions() {
-    val otherTopic = "other"
-    val subscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
-    val expandedSubscriptions = subscriptions ++ Set(new 
TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
-    this.consumers(0).subscribe(List(topic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
-
-    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, 
this.servers)
-    this.consumers(0).subscribe(List(topic, otherTopic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == expandedSubscriptions.asJava
-    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
-  }
-
-  @Test
-  def testShrinkingTopicSubscriptions() {
-    val otherTopic = "other"
-    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, 
this.servers)
-    val subscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new 
TopicPartition(otherTopic, 1))
-    val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
-    this.consumers(0).subscribe(List(topic, otherTopic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
-
-    this.consumers(0).subscribe(List(topic))
-    TestUtils.waitUntilTrue(() => {
-      this.consumers(0).poll(50)
-      this.consumers(0).assignment == shrunkenSubscriptions.asJava
-    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got 
${this.consumers(0).assignment}")
-  }
-
-  @Test
-  def testPartitionPauseAndResume() {
-    sendRecords(5)
-    this.consumers(0).assign(List(tp))
-    consumeRecords(this.consumers(0), 5, 0)
-    this.consumers(0).pause(tp)
-    sendRecords(5)
-    assertTrue(this.consumers(0).poll(0).isEmpty)
-    this.consumers(0).resume(tp)
-    consumeRecords(this.consumers(0), 5, 5)
-  }
-
-  @Test
-  def testPauseStateNotPreservedByRebalance() {
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100"); // timeout quickly to avoid slow test
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
-
-    sendRecords(5)
-    consumer0.subscribe(List(topic))
-    consumeRecords(consumer0, 5, 0)
-    consumer0.pause(tp)
-
-    // subscribe to a new topic to trigger a rebalance
-    consumer0.subscribe(List("topic2"))
-
-    // after rebalance, our position should be reset and our pause state lost,
-    // so we should be able to consume from the beginning
-    consumeRecords(consumer0, 0, 5)
-  }
-
-  private class TestConsumerReassignmentListener extends 
ConsumerRebalanceListener {
-    var callsToAssigned = 0
-    var callsToRevoked = 0
-    def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) 
{
-      info("onPartitionsAssigned called.")
-      callsToAssigned += 1
-    }
-    def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
-      info("onPartitionsRevoked called.")
-      callsToRevoked += 1
-    }
-  }
-
-  private def sendRecords(numRecords: Int): Unit = {
-    sendRecords(numRecords, tp)
-  }
-
-  private def sendRecords(numRecords: Int, tp: TopicPartition) {
-    val futures = (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), 
i.toString.getBytes, i.toString.getBytes))
-    }
-    futures.map(_.get)
-  }
-
-  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], 
numRecords: Int, startingOffset: Int) {
-    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 300
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala)
-        records.add(record)
-      if(iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
-      iters += 1
-    }
-    for (i <- 0 until numRecords) {
-      val record = records.get(i)
-      val offset = startingOffset + i
-      assertEquals(topic, record.topic())
-      assertEquals(part, record.partition())
-      assertEquals(offset.toLong, record.offset())
-    }
-  }
-
-  private def awaitCommitCallback(consumer: Consumer[Array[Byte], 
Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
-    val startCount = commitCallback.count
-    val started = System.currentTimeMillis()
-    while (commitCallback.count == startCount && System.currentTimeMillis() - 
started < 10000)
-      this.consumers(0).poll(50)
-    assertEquals(startCount + 1, commitCallback.count)
-  }
-
-  private class CountConsumerCommitCallback extends OffsetCommitCallback {
-    var count = 0
-
-    override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = count += 1
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 77fcd8b..2ec59fb 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
 import java.util.Properties
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
-import kafka.server.{OffsetManager, KafkaConfig}
+import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import org.junit.{After, Before}
 import scala.collection.mutable.Buffer
@@ -41,11 +41,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness 
{
   lazy val consumerConfig = new Properties
   lazy val serverConfig = new Properties
 
-  var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
   override def generateConfigs() = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect)
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, 
interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile)
     cfgs.foreach(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
new file mode 100644
index 0000000..335d585
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -0,0 +1,15 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+class PlaintextConsumerTest extends BaseConsumerTest

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
new file mode 100644
index 0000000..d017d13
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -0,0 +1,54 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Test
+
+class PlaintextProducerSendTest extends BaseProducerSendTest {
+
+  @Test
+  def testSerializerConstructors() {
+    try {
+      createNewProducerWithNoSerializer(brokerList)
+      fail("Instantiating a producer without specifying a serializer should 
cause a ConfigException")
+    } catch {
+      case ce : ConfigException => // this is ok
+    }
+
+    // create a producer with explicit serializers should succeed
+    createNewProducerWithExplicitSerializer(brokerList)
+  }
+
+  private def createNewProducerWithNoSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+  }
+
+  private def createNewProducerWithExplicitSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new 
ByteArraySerializer, new ByteArraySerializer)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
deleted file mode 100644
index 3aef172..0000000
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ /dev/null
@@ -1,416 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import kafka.consumer.SimpleConsumer
-import kafka.integration.KafkaServerTestHarness
-import kafka.message.Message
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.SerializationException
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-class ProducerSendTest extends KafkaServerTestHarness {
-  val numServers = 2
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
-
-  def generateConfigs() =
-    TestUtils.createBrokerConfigs(numServers, zkConnect, 
false).map(KafkaConfig.fromProps(_, overridingProps))
-
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
-
-  private val topic = "topic"
-  private val numRecords = 100
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 
1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 
1024*1024, "")
-  }
-
-  @After
-  override def tearDown() {
-    consumer1.close()
-    consumer2.close()
-
-    super.tearDown()
-  }
-
-  /**
-   * testSendOffset checks the basic send API behavior
-   *
-   * 1. Send with null key/value/partition-id should be accepted; send with 
null topic should be rejected.
-   * 2. Last message of the non-blocking send should return the correct offset 
metadata
-   */
-  @Test
-  def testSendOffset() {
-    var producer = TestUtils.createNewProducer(brokerList)
-    val partition = new Integer(0)
-
-    object callback extends Callback {
-      var offset = 0L
-      def onCompletion(metadata: RecordMetadata, exception: Exception) {
-        if (exception == null) {
-          assertEquals(offset, metadata.offset())
-          assertEquals(topic, metadata.topic())
-          assertEquals(partition, metadata.partition())
-          offset += 1
-        } else {
-          fail("Send callback returns the following exception", exception)
-        }
-      }
-    }
-
-    try {
-      // create topic
-      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-
-      // send a normal record
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, "value".getBytes)
-      assertEquals("Should have offset 0", 0L, producer.send(record0, 
callback).get.offset)
-
-      // send a record with null value should be ok
-      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, null)
-      assertEquals("Should have offset 1", 1L, producer.send(record1, 
callback).get.offset)
-
-      // send a record with null key should be ok
-      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, "value".getBytes)
-      assertEquals("Should have offset 2", 2L, producer.send(record2, 
callback).get.offset)
-
-      // send a record with null part id should be ok
-      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
-      assertEquals("Should have offset 3", 3L, producer.send(record3, 
callback).get.offset)
-
-      // send a record with null topic should fail
-      try {
-        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, 
partition, "key".getBytes, "value".getBytes)
-        producer.send(record4, callback)
-        fail("Should not allow sending a record without topic")
-      } catch {
-        case iae: IllegalArgumentException => // this is ok
-        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
-      }
-
-      // non-blocking send a list of records
-      for (i <- 1 to numRecords)
-        producer.send(record0, callback)
-
-      // check that all messages have been acked via offset
-      assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, 
producer.send(record0, callback).get.offset)
-
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-
-  @Test
-  def testSerializer() {
-    // send a record with a wrong type should receive a serialization exception
-    try {
-      val producer = createNewProducerWithWrongSerializer(brokerList)
-      val record5 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new 
Integer(0), "key".getBytes, "value".getBytes)
-      producer.send(record5)
-      fail("Should have gotten a SerializationException")
-    } catch {
-      case se: SerializationException => // this is ok
-    }
-
-    try {
-      createNewProducerWithNoSerializer(brokerList)
-      fail("Instantiating a producer without specifying a serializer should 
cause a ConfigException")
-    } catch {
-      case ce : ConfigException => // this is ok
-    }
-
-    // create a producer with explicit serializers should succeed
-    createNewProducerWithExplicitSerializer(brokerList)
-  }
-
-  private def createNewProducerWithWrongSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
-    import org.apache.kafka.clients.producer.ProducerConfig
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-  }
-
-  private def createNewProducerWithNoSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
-    import org.apache.kafka.clients.producer.ProducerConfig
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-  }
-
-  private def createNewProducerWithExplicitSerializer(brokerList: String) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
-    import org.apache.kafka.clients.producer.ProducerConfig
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new 
ByteArraySerializer, new ByteArraySerializer)
-  }
-
-  /**
-   * testClose checks the closing behavior
-   *
-   * After close() returns, all messages should be sent with correct returned 
offset metadata
-   */
-  @Test
-  def testClose() {
-    var producer = TestUtils.createNewProducer(brokerList)
-
-    try {
-      // create topic
-      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-
-      // non-blocking send a list of records
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
-      for (i <- 1 to numRecords)
-        producer.send(record0)
-      val response0 = producer.send(record0)
-
-      // close the producer
-      producer.close()
-      producer = null
-
-      // check that all messages have been acked via offset,
-      // this also checks that messages with same key go to the same partition
-      assertTrue("The last message should be acked before producer is 
shutdown", response0.isDone)
-      assertEquals("Should have offset " + numRecords, numRecords.toLong, 
response0.get.offset)
-
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-
-  /**
-   * testSendToPartition checks the partitioning behavior
-   *
-   * The specified partition-id should be respected
-   */
-  @Test
-  def testSendToPartition() {
-    var producer = TestUtils.createNewProducer(brokerList)
-
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val partition = 1
-
-      // make sure leaders exist
-      val leader1 = leaders(partition)
-      assertTrue("Leader for topic \"topic\" partition 1 should exist", 
leader1.isDefined)
-
-      val responses =
-        for (i <- 1 to numRecords)
-          yield producer.send(new 
ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + 
i).getBytes))
-      val futures = responses.toList
-      futures.map(_.get)
-      for (future <- futures)
-        assertTrue("Request should have completed", future.isDone)
-
-      // make sure all of them end up in the same partition with increasing 
offset values
-      for ((future, offset) <- futures zip (0 until numRecords)) {
-        assertEquals(offset.toLong, future.get.offset)
-        assertEquals(topic, future.get.topic)
-        assertEquals(partition, future.get.partition)
-      }
-
-      // make sure the fetched messages also respect the partitioning and 
ordering
-      val fetchResponse1 = if(leader1.get == configs(0).brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
-      }
-      val messageSet1 = fetchResponse1.messageSet(topic, 
partition).iterator.toBuffer
-      assertEquals("Should have fetched " + numRecords + " messages", 
numRecords, messageSet1.size)
-
-      // TODO: also check topic and partition after they are added in the 
return messageSet
-      for (i <- 0 to numRecords - 1) {
-        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), 
messageSet1(i).message)
-        assertEquals(i.toLong, messageSet1(i).offset)
-      }
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-
-  /**
-   * testAutoCreateTopic
-   *
-   * The topic should be created upon sending the first message
-   */
-  @Test
-  def testAutoCreateTopic() {
-    var producer = TestUtils.createNewProducer(brokerList, retries = 5)
-
-    try {
-      // Send a message to auto-create the topic
-      val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
-      assertEquals("Should have offset 0", 0L, 
producer.send(record).get.offset)
-
-      // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
-
-    } finally {
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
-    }
-  }
-
-  /**
-   * Test that flush immediately sends all accumulated requests.
-   */
-  @Test
-  def testFlush() {
-    var producer = TestUtils.createNewProducer(brokerList, lingerMs = 
Long.MaxValue)
-    try {
-      TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"value".getBytes)
-      for(i <- 0 until 50) {
-        val responses = (0 until numRecords) map (i => producer.send(record))
-        assertTrue("No request is complete.", responses.forall(!_.isDone()))
-        producer.flush()
-        assertTrue("All requests are complete.", responses.forall(_.isDone()))
-      }
-    } finally {
-      if (producer != null)
-        producer.close()
-    }
-  }
-
-  /**
-   * Test close with zero timeout from caller thread
-   */
-  @Test
-  def testCloseWithZeroTimeoutFromCallerThread() {
-    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-      val leader0 = leaders(0)
-      val leader1 = leaders(1)
-
-      // create record
-      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
null, "value".getBytes)
-      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, 
null, "value".getBytes)
-
-      // Test closing from caller thread.
-      for(i <- 0 until 50) {
-        producer = TestUtils.createNewProducer(brokerList, lingerMs = 
Long.MaxValue)
-        val responses = (0 until numRecords) map (i => producer.send(record0))
-        assertTrue("No request is complete.", responses.forall(!_.isDone()))
-        producer.close(0, TimeUnit.MILLISECONDS)
-        responses.foreach { future =>
-          try {
-            future.get()
-            fail("No message should be sent successfully.")
-          } catch {
-            case e: Exception =>
-              assertEquals("java.lang.IllegalStateException: Producer is 
closed forcefully.", e.getMessage)
-          }
-        }
-        val fetchResponse = if (leader0.get == configs(0).brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
-        }
-        assertEquals("Fetch response should have no message returned.", 0, 
fetchResponse.messageSet(topic, 0).size)
-      }
-    } finally {
-      if (producer != null)
-        producer.close()
-    }
-  }
-
-  /**
-   * Test close with zero and non-zero timeout from sender thread
-   */
-  @Test
-  def testCloseWithZeroTimeoutFromSenderThread() {
-    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-    try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-      val leader = leaders(0)
-
-      // create record
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 
null, "value".getBytes)
-
-      // Test closing from sender thread.
-      class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) 
extends Callback {
-        override def onCompletion(metadata: RecordMetadata, exception: 
Exception) {
-          // Trigger another batch in accumulator before close the producer. 
These messages should
-          // not be sent.
-          (0 until numRecords) map (i => producer.send(record))
-          // The close call will be called by all the message callbacks. This 
tests idempotence of the close call.
-          producer.close(0, TimeUnit.MILLISECONDS)
-          // Test close with non zero timeout. Should not block at all.
-          producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
-        }
-      }
-      for(i <- 0 until 50) {
-        producer = TestUtils.createNewProducer(brokerList, lingerMs = 
Long.MaxValue)
-        // send message to partition 0
-        val responses = ((0 until numRecords) map (i => producer.send(record, 
new CloseCallback(producer))))
-        assertTrue("No request is complete.", responses.forall(!_.isDone()))
-        // flush the messages.
-        producer.flush()
-        assertTrue("All request are complete.", responses.forall(_.isDone()))
-        // Check the messages received by broker.
-        val fetchResponse = if (leader.get == configs(0).brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 
Int.MaxValue).build())
-        }
-        val expectedNumRecords = (i + 1) * numRecords
-        assertEquals("Fetch response to partition 0 should have %d 
messages.".format(expectedNumRecords),
-          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
-      }
-    } finally {
-      if (producer != null)
-        producer.close()
-    }
-  }
-}

Reply via email to