Repository: kafka Updated Branches: refs/heads/trunk 941e2177c -> 9b58372dc
KAFKA-5371; Increase request timeout for producer used by testReachableServer 500ms is low for a shared Jenkins environment. Also removed the try/catch blocks that simply obscured the underlying error. Author: Ismael Juma <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Rajini Sivaram <[email protected]> Closes #3225 from ijuma/kafka-5371-flaky-testReachableServer Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b58372d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b58372d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b58372d Branch: refs/heads/trunk Commit: 9b58372dcce5dc96826d4786123513a4d8c7b39f Parents: 941e217 Author: Ismael Juma <[email protected]> Authored: Sun Jun 4 15:28:06 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sun Jun 4 15:28:10 2017 +0100 ---------------------------------------------------------------------- .../unit/kafka/producer/SyncProducerTest.scala | 41 +++++++------------- .../test/scala/unit/kafka/utils/TestUtils.scala | 3 +- 2 files changed, 16 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 41a8a6c..cde49de 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -52,38 +52,25 @@ class SyncProducerTest extends KafkaServerTestHarness { @Test def testReachableServer() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(boundPort(server)) - val producer = new SyncProducer(new SyncProducerConfig(props)) + val firstStart = Time.SYSTEM.milliseconds - try { - val response = producer.send(produceRequest("test", 0, - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) - assertNotNull(response) - } catch { - case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage) - } - val firstEnd = Time.SYSTEM.milliseconds - assertTrue((firstEnd-firstStart) < 2000) + var response = producer.send(produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) + assertNotNull(response) + assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000) + val secondStart = Time.SYSTEM.milliseconds - try { - val response = producer.send(produceRequest("test", 0, - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) - assertNotNull(response) - } catch { - case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage) - } - val secondEnd = Time.SYSTEM.milliseconds - assertTrue((secondEnd-secondStart) < 2000) - try { - val response = producer.send(produceRequest("test", 0, - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) - assertNotNull(response) - } catch { - case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage) - } + response = producer.send(produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) + assertNotNull(response) + assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000) + + response = producer.send(produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) + assertNotNull(response) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/9b58372d/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a0f4762..aae58cc 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -643,11 +643,12 @@ object TestUtils extends Logging { props } + @deprecated("This method has been deprecated and will be removed in a future release", "0.11.0.0") def getSyncProducerConfig(port: Int): Properties = { val props = new Properties() props.put("host", "localhost") props.put("port", port.toString) - props.put("request.timeout.ms", "500") + props.put("request.timeout.ms", "10000") props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName) props
