This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e1961b8 MINOR: Update code to not use deprecated methods (#6434) e1961b8 is described below commit e1961b8298c6d7ec028cd95721f4b498689c506e Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Mar 15 19:47:40 2019 -0700 MINOR: Update code to not use deprecated methods (#6434) Reviewers: Bill Bejeck <b...@confluent.io>, John Roesler <j...@confluent.io>, Colin P. McCabe <cmcc...@confluent.io> --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 ++-- .../integration/kafka/api/AdminClientIntegrationTest.scala | 10 +++++----- .../scala/integration/kafka/api/BaseProducerSendTest.scala | 13 +++++++------ .../test/scala/integration/kafka/api/BaseQuotaTest.scala | 7 ++++--- .../scala/integration/kafka/api/ConsumerBounceTest.scala | 11 ++++++----- .../integration/kafka/api/IntegrationTestHarness.scala | 2 +- .../test/scala/integration/kafka/api/TransactionsTest.scala | 3 ++- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 ++-- 8 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 823f0fd..fda1812 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -324,7 +324,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // uncommitted record since last poll. Using one second as poll's timeout ensures that // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset // commit. - recordIter = consumer.poll(Duration.ofSeconds(1)).iterator + recordIter = consumer.poll(Duration.ofSeconds(1L)).iterator if (!recordIter.hasNext) throw new NoRecordsException } @@ -387,7 +387,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def close(timeout: Long) { - this.producer.close(timeout, TimeUnit.MILLISECONDS) + this.producer.close(Duration.ofMillis(timeout)) } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index cf019a8..027266d 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -16,7 +16,7 @@ */ package kafka.api -import java.util +import java.{time, util} import java.util.{Collections, Properties} import java.util.Arrays.asList import java.util.concurrent.{ExecutionException, TimeUnit} @@ -1066,11 +1066,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val topics = Seq("mytopic", "mytopic2") val newTopics = topics.map(new NewTopic(_, 1, 1)) val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() - client.close(2, TimeUnit.HOURS) + client.close(time.Duration.ofHours(2)) val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) future.get - client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect + client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect } /** @@ -1086,7 +1086,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { // cancelled by the close operation. val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, new CreateTopicsOptions().timeoutMs(900000)).all() - client.close(0, TimeUnit.MILLISECONDS) + client.close(time.Duration.ZERO) assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) } @@ -1164,7 +1164,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { override def run { consumer.subscribe(Collections.singleton(testTopicName)) while (true) { - consumer.poll(5000) + consumer.poll(time.Duration.ofSeconds(5L)) consumer.commitSync() } } diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 9d454e9..2ce16d2 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -17,6 +17,7 @@ package kafka.api +import java.time.Duration import java.nio.charset.StandardCharsets import java.util.Properties import java.util.concurrent.TimeUnit @@ -193,7 +194,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { s"value$i".getBytes(StandardCharsets.UTF_8)) producer.send(record) } - producer.close(timeoutMs, TimeUnit.MILLISECONDS) + producer.close(Duration.ofMillis(timeoutMs)) val lastOffset = futures.foldLeft(0) { (offset, future) => val recordMetadata = future.get assertEquals(topic, recordMetadata.topic) @@ -248,7 +249,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { s"value$i".getBytes(StandardCharsets.UTF_8)) (record, producer.send(record, callback)) } - producer.close(20000L, TimeUnit.MILLISECONDS) + producer.close(Duration.ofSeconds(20L)) recordAndFutures.foreach { case (record, future) => val recordMetadata = future.get if (timestampType == TimestampType.LOG_APPEND_TIME) @@ -445,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer(brokerList, lingerMs = Int.MaxValue) val responses = (0 until numRecords) map (_ => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.close(0, TimeUnit.MILLISECONDS) + producer.close(Duration.ZERO) responses.foreach { future => try { future.get() @@ -454,7 +455,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { case e: ExecutionException => assertEquals(classOf[KafkaException], e.getCause.getClass) } } - assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count) + assertEquals("Fetch response should have no message returned.", 0, consumer.poll(Duration.ofMillis(50L)).count) } } @@ -476,9 +477,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { if (sendRecords) (0 until numRecords) foreach (_ => producer.send(record)) // The close call will be called by all the message callbacks. This tests idempotence of the close call. - producer.close(0, TimeUnit.MILLISECONDS) + producer.close(Duration.ZERO) // Test close with non zero timeout. Should not block at all. - producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) + producer.close() } } for (i <- 0 until 50) { diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index b28a40f..4b278f0 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -14,6 +14,7 @@ package kafka.api +import java.time.Duration import java.util.{Collections, HashMap, Properties} import kafka.api.QuotaTestClients._ @@ -140,7 +141,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { val endTimeMs = System.currentTimeMillis + 10000 var throttled = false while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) { - consumer.poll(100) + consumer.poll(Duration.ofMillis(100L)) val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId) throttled = throttleMetric != null && metricValue(throttleMetric) > 0 } @@ -197,7 +198,7 @@ abstract class QuotaTestClients(topic: String, var numConsumed = 0 var throttled = false do { - numConsumed += consumer.poll(100).count + numConsumed += consumer.poll(Duration.ofMillis(100L)).count val metric = throttleMetric(QuotaType.Fetch, consumerClientId) throttled = metric != null && metricValue(metric) > 0 } while (numConsumed < maxRecords && !throttled) @@ -206,7 +207,7 @@ abstract class QuotaTestClients(topic: String, if (throttled && numConsumed < maxRecords && waitForRequestCompletion) { val minRecords = numConsumed + 1 while (numConsumed < minRecords) - numConsumed += consumer.poll(100).count + numConsumed += consumer.poll(Duration.ofMillis(100L)).count } numConsumed } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index e535104..1a1f37e 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -178,7 +178,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { executor.schedule(new Runnable { def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) }, 2, TimeUnit.SECONDS) - consumer.poll(0) + consumer.poll(time.Duration.ZERO) val producer = createProducer() @@ -481,14 +481,15 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { revokeSemaphore.foreach(s => s.release()) } }) - consumer.poll(0) + // requires to used deprecated `poll(long)` to trigger metadata update + consumer.poll(0L) }, 0) } def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) { val startMs = System.currentTimeMillis while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) - otherConsumers.foreach(consumer => consumer.poll(100)) + otherConsumers.foreach(consumer => consumer.poll(time.Duration.ofMillis(100L))) assertTrue("Rebalance did not complete in time", future.isDone) } @@ -569,7 +570,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { val closeGraceTimeMs = 2000 val startNanos = System.nanoTime info("Closing consumer with timeout " + closeTimeoutMs + " ms.") - consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS) + consumer.close(time.Duration.ofMillis(closeTimeoutMs)) val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos) maxCloseTimeMs.foreach { ms => assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs) @@ -592,7 +593,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { } def onPartitionsRevoked(partitions: Collection[TopicPartition]) { }}) - consumer.poll(3000) + consumer.poll(time.Duration.ofSeconds(3L)) assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS)) if (committedRecords > 0) assertEquals(committedRecords, consumer.committed(tp).offset) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 5a20005..640244d 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -126,7 +126,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { @After override def tearDown() { - producers.foreach(_.close(0, TimeUnit.MILLISECONDS)) + producers.foreach(_.close(Duration.ZERO)) consumers.foreach(_.wakeup()) consumers.foreach(_.close(Duration.ZERO)) producers.clear() diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 34dea70..e3bfdc0 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Long => JLong} +import java.time.Duration import java.util.{Optional, Properties} import java.util.concurrent.TimeUnit @@ -578,7 +579,7 @@ class TransactionsTest extends KafkaServerTestHarness { try { producer.commitTransaction() } finally { - producer.close(0, TimeUnit.MILLISECONDS) + producer.close(Duration.ZERO) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c14f41e..e8aa081 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -146,7 +146,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet clientThreads.foreach(_.initiateShutdown()) clientThreads.foreach(_.join(5 * 1000)) executors.foreach(_.shutdownNow()) - producers.foreach(_.close(0, TimeUnit.MILLISECONDS)) + producers.foreach(_.close(Duration.ZERO)) consumers.foreach(_.close(Duration.ofMillis(0))) adminClients.foreach(_.close()) TestUtils.shutdownServers(servers) @@ -1470,7 +1470,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet override def doWork(): Unit = { try { while (isRunning || (lastReceived != producerThread.lastSent && System.currentTimeMillis < endTimeMs)) { - val records = consumer.poll(50) + val records = consumer.poll(Duration.ofMillis(50L)) received += records.count if (!records.isEmpty) { lastBatch = records