Repository: kafka
Updated Branches:
refs/heads/trunk aebba89a2 -> 9323a7533
MINOR: Use new consumer in ProducerCompressionTest
This should be less flaky as it has a higher timeout. I also increased the
timeout
in a couple of other tests that had a very low (100 ms) timeouts.
The failure would manifest itself as:
```text
java.net.SocketTimeoutException
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
at
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:100)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:84)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:133)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:133)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:132)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:131)
at
kafka.api.test.ProducerCompressionTest.testCompression(ProducerCompressionTest.scala:97)
```
Author: Ismael Juma <[email protected]>
Reviewers: Rajini Sivaram <[email protected]>
Closes #3178 from ijuma/producer-compression-test-flaky
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9323a753
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9323a753
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9323a753
Branch: refs/heads/trunk
Commit: 9323a753355abcb62fe8121303e5e65dbfe018ea
Parents: aebba89
Author: Ismael Juma <[email protected]>
Authored: Wed May 31 16:13:30 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed May 31 16:13:30 2017 +0100
----------------------------------------------------------------------
.../kafka/api/BaseProducerSendTest.scala | 6 +-
.../kafka/api/ProducerBounceTest.scala | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 88 +++++++++-----------
.../unit/kafka/producer/ProducerTest.scala | 2 +-
4 files changed, 44 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 42e3b11..645f6ac 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -187,13 +187,13 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
try {
TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
- val recordAndFutures = for (i <- 1 to numRecords) yield {
+ val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition,
s"key$i".getBytes(StandardCharsets.UTF_8),
s"value$i".getBytes(StandardCharsets.UTF_8))
- (record, producer.send(record))
+ producer.send(record)
}
producer.close(timeoutMs, TimeUnit.MILLISECONDS)
- val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record,
future)) =>
+ val lastOffset = futures.foldLeft(0) { (offset, future) =>
val recordMetadata = future.get
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5fead18..9fe0e5c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -101,7 +101,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
val newLeaders = (0 until numPartitions).map(i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
val fetchResponses = newLeaders.zipWithIndex.map { case (leader,
partition) =>
// Consumers must be instantiated after all the restarts since they use
random ports each time they start up
- val consumer = new SimpleConsumer("localhost",
boundPort(servers(leader)), 100, 1024 * 1024, "")
+ val consumer = new SimpleConsumer("localhost",
boundPort(servers(leader)), 30000, 1024 * 1024, "")
val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1,
partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
consumer.close
response
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 2001095..23b78b0 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,7 +17,8 @@
package kafka.api.test
-import java.util.{ArrayList, Collection, Properties}
+import java.util.{Collection, Collections, Properties}
+import scala.collection.JavaConverters._
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
@@ -25,30 +26,27 @@ import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.junit.Assert._
-import kafka.api.FetchRequestBuilder
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.consumer.SimpleConsumer
-import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils
-
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.serialization.ByteArraySerializer
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends
ZooKeeperTestHarness {
- private val brokerId = 0
- private var server: KafkaServer = null
+ private val brokerId = 0
private val topic = "topic"
private val numRecords = 2000
+ private var server: KafkaServer = null
+
@Before
override def setUp() {
super.setUp()
-
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- val config = KafkaConfig.fromProps(props)
-
- server = TestUtils.createServer(config)
+ server = TestUtils.createServer(KafkaConfig.fromProps(props))
}
@After
@@ -65,15 +63,14 @@ class ProducerCompressionTest(compression: String) extends
ZooKeeperTestHarness
@Test
def testCompression() {
- val props = new Properties()
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.getBrokerListStrFromServers(Seq(server)))
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
- props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
- val consumer = new SimpleConsumer("localhost",
TestUtils.boundPort(server), 100, 1024*1024, "")
+ val producerProps = new Properties()
+ val bootstrapServers = TestUtils.getBrokerListStrFromServers(Seq(server))
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers)
+ producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200")
+ val producer = new KafkaProducer(producerProps, new ByteArraySerializer,
new ByteArraySerializer)
+ val consumer = TestUtils.createNewConsumer(bootstrapServers,
securityProtocol = SecurityProtocol.PLAINTEXT)
try {
// create topic
@@ -81,50 +78,43 @@ class ProducerCompressionTest(compression: String) extends
ZooKeeperTestHarness
val partition = 0
// prepare the messages
- val messages = for (i <-0 until numRecords)
- yield ("value" + i).getBytes
+ val messageValues = (0 until numRecords).map(i => "value" + i)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messages)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic,
null, now, null, message))
- val futures = responses.toList
- for ((future, offset) <- futures zip (0 until numRecords)) {
+ val responses = for (message <- messageValues)
+ yield producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ for ((future, offset) <- responses.zipWithIndex) {
assertEquals(offset.toLong, future.get.offset)
}
+ val tp = new TopicPartition(topic, partition)
// make sure the fetched message count match
- val fetchResponse = consumer.fetch(new
FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
- val messageSet = fetchResponse.messageSet(topic,
partition).iterator.toBuffer
- assertEquals("Should have fetched " + numRecords + " messages",
numRecords, messageSet.size)
-
- var index = 0
- for (message <- messages) {
- assertEquals(new Message(bytes = message, now, Message.MagicValue_V1),
messageSet(index).message)
- assertEquals(index.toLong, messageSet(index).offset)
- index += 1
+ consumer.assign(Collections.singleton(tp))
+ consumer.seek(tp, 0)
+ val records = TestUtils.consumeRecords(consumer, numRecords)
+
+ for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(now, record.timestamp)
+ assertEquals(index.toLong, record.offset)
}
} finally {
- if (producer != null) {
- producer.close()
- producer = null
- }
- if (consumer != null)
- consumer.close()
+ producer.close()
+ consumer.close()
}
}
}
object ProducerCompressionTest {
- // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]).
- @Parameters
+ @Parameters(name = "{index} compressionType = {0}")
def parameters: Collection[Array[String]] = {
- val list = new ArrayList[Array[String]]()
- list.add(Array("none"))
- list.add(Array("gzip"))
- list.add(Array("snappy"))
- list.add(Array("lz4"))
- list
+ Seq(
+ Array("none"),
+ Array("gzip"),
+ Array("snappy"),
+ Array("lz4")
+ ).asJava
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9323a753/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 1d3f77f..b2b9806 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -59,7 +59,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
def getConsumer2() = {
if (consumer2 == null)
- consumer2 = new SimpleConsumer("localhost",
TestUtils.boundPort(server2), 100, 64*1024, "")
+ consumer2 = new SimpleConsumer("localhost",
TestUtils.boundPort(server2), 1000000, 64*1024, "")
consumer2
}