Repository: flink Updated Branches: refs/heads/master bf4eed144 -> 51b7ede28
[FLINK-4422] [kafka] Convert all time interval measurements to System.nanoTime() This closes #3422. This closes #3421. This closes #3420. This closes #3419. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b7ede2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b7ede2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b7ede2 Branch: refs/heads/master Commit: 51b7ede288e06ccedfbba9f92ac361f28bb51452 Parents: bf4eed1 Author: Jin Mingjian <[email protected]> Authored: Mon Feb 27 12:58:44 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Mar 2 14:53:02 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++-- .../kafka/internals/ClosableBlockingQueue.java | 8 ++++---- .../connectors/kafka/internals/KillerWatchDog.java | 4 ++-- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++-- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 4 ++-- .../connectors/kafka/KafkaConsumerTestBase.java | 14 +++++++------- 6 files changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index bc1faaf..d27e53a 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } // validate that the topic has been created - final long deadline = System.currentTimeMillis() + 30000; + final long deadline = System.nanoTime() + 30_000_000_000L; do { try { if(secureMode) { @@ -321,7 +321,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } checkZKConn.close(); } - while (System.currentTimeMillis() < deadline); + while (System.nanoTime() < deadline); fail("Test topic could not be created"); } http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java index 23ff276..e31dcac 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -355,13 +355,13 @@ public class ClosableBlockingQueue<E> { throw new IllegalArgumentException("invalid timeout"); } - final long deadline = System.currentTimeMillis() + timeoutMillis; + final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L; lock.lock(); try { while (open && elements.isEmpty() && timeoutMillis > 0) { nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); - timeoutMillis = deadline - System.currentTimeMillis(); + timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L; } if (!open) { @@ -437,13 +437,13 @@ public class ClosableBlockingQueue<E> { throw new IllegalArgumentException("invalid timeout"); } - final long deadline = System.currentTimeMillis() + timeoutMillis; + final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L; lock.lock(); try { while (open && elements.isEmpty() && timeoutMillis > 0) { nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); - timeoutMillis = deadline - System.currentTimeMillis(); + timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L; } if (!open) { http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java index 4d61e53..574d9f7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java @@ -42,10 +42,10 @@ class KillerWatchDog extends Thread { @SuppressWarnings("deprecation") @Override public void run() { - final long deadline = System.currentTimeMillis() + timeout; + final long deadline = System.nanoTime() / 1_000_000L + timeout; long now; - while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { + while (toKill.isAlive() && (now = (System.nanoTime() / 1_000_000L)) < deadline) { try { toKill.join(deadline - now); } http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 6c2672a..643ee8e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -283,7 +283,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { creator.close(); // validate that the topic has been created - final long deadline = System.currentTimeMillis() + 30000; + final long deadline = System.nanoTime() + 30_000_000_000L; do { try { Thread.sleep(100); @@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { return; } } - while (System.currentTimeMillis() < deadline); + while (System.nanoTime() < deadline); fail ("Test topic could not be created"); } http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 99c11c4..c9ef6da 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -295,7 +295,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { LOG.info("Topic {} create request is successfully posted", topic); // validate that the topic has been created - final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout); + final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L; do { try { if(secureMode) { @@ -325,7 +325,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { LOG.info("topic {} has not been created yet. Will check again...", topic); checkZKConn.close(); } - while (System.currentTimeMillis() < deadline); + while (System.nanoTime() < deadline); fail("Test topic could not be created"); } http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index cb8b0d0..580c507 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -233,7 +233,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { runner.start(); final Long l50 = 50L; // the final committed offset in Kafka should be 50 - final long deadline = 30000 + System.currentTimeMillis(); + final long deadline = 30_000_000_000L + System.nanoTime(); KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); @@ -248,7 +248,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Thread.sleep(100); } - while (System.currentTimeMillis() < deadline); + while (System.nanoTime() < deadline); // cancel the job JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); @@ -406,7 +406,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); final Long l50 = 50L; // the final committed offset in Kafka should be 50 - final long deadline = 30000 + System.currentTimeMillis(); + final long deadline = 30_000_000_000L + System.nanoTime(); do { Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); @@ -418,7 +418,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Thread.sleep(100); } - while (System.currentTimeMillis() < deadline); + while (System.nanoTime() < deadline); // cancel the job JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); @@ -2018,10 +2018,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }; runner.start(); - final long deadline = System.currentTimeMillis() + 10000; + final long deadline = System.nanoTime() + 10_000_000_000L; long delay; - while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) { - runner.join(delay); + while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) { + runner.join(delay/1_000_000L); } boolean success;
