Repository: flink
Updated Branches:
  refs/heads/master bf4eed144 -> 51b7ede28


[FLINK-4422] [kafka] Convert all time interval measurements to System.nanoTime()

This closes #3422.
This closes #3421.
This closes #3420.
This closes #3419.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b7ede2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b7ede2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b7ede2

Branch: refs/heads/master
Commit: 51b7ede288e06ccedfbba9f92ac361f28bb51452
Parents: bf4eed1
Author: Jin Mingjian <[email protected]>
Authored: Mon Feb 27 12:58:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Mar 2 14:53:02 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../kafka/internals/ClosableBlockingQueue.java        |  8 ++++----
 .../connectors/kafka/internals/KillerWatchDog.java    |  4 ++--
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    |  4 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java       | 14 +++++++-------
 6 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index bc1faaf..d27e53a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                }
 
                // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
+               final long deadline = System.nanoTime() + 30_000_000_000L;
                do {
                        try {
                                if(secureMode) {
@@ -321,7 +321,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        }
                        checkZKConn.close();
                }
-               while (System.currentTimeMillis() < deadline);
+               while (System.nanoTime() < deadline);
                fail("Test topic could not be created");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index 23ff276..e31dcac 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -355,13 +355,13 @@ public class ClosableBlockingQueue<E> {
                        throw new IllegalArgumentException("invalid timeout");
                }
                
-               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+               final long deadline = System.nanoTime() + timeoutMillis * 
1_000_000L;
                
                lock.lock();
                try {
                        while (open && elements.isEmpty() && timeoutMillis > 0) 
{ 
                                nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
-                               timeoutMillis = deadline - 
System.currentTimeMillis();
+                               timeoutMillis = (deadline - System.nanoTime()) 
/ 1_000_000L;
                        }
                        
                        if (!open) {
@@ -437,13 +437,13 @@ public class ClosableBlockingQueue<E> {
                        throw new IllegalArgumentException("invalid timeout");
                }
 
-               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+               final long deadline = System.nanoTime() + timeoutMillis * 
1_000_000L;
 
                lock.lock();
                try {
                        while (open && elements.isEmpty() && timeoutMillis > 0) 
{
                                nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
-                               timeoutMillis = deadline - 
System.currentTimeMillis();
+                               timeoutMillis = (deadline - System.nanoTime()) 
/ 1_000_000L;
                        }
 
                        if (!open) {

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
index 4d61e53..574d9f7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -42,10 +42,10 @@ class KillerWatchDog extends Thread {
        @SuppressWarnings("deprecation")
        @Override
        public void run() {
-               final long deadline = System.currentTimeMillis() + timeout;
+               final long deadline = System.nanoTime() / 1_000_000L + timeout;
                long now;
 
-               while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
+               while (toKill.isAlive() && (now = (System.nanoTime() / 
1_000_000L)) < deadline) {
                        try {
                                toKill.join(deadline - now);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6c2672a..643ee8e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -283,7 +283,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                creator.close();
 
                // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
+               final long deadline = System.nanoTime() + 30_000_000_000L;
                do {
                        try {
                                Thread.sleep(100);
@@ -296,7 +296,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                                return;
                        }
                }
-               while (System.currentTimeMillis() < deadline);
+               while (System.nanoTime() < deadline);
                fail ("Test topic could not be created");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 99c11c4..c9ef6da 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -295,7 +295,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                LOG.info("Topic {} create request is successfully posted", 
topic);
 
                // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 
Integer.parseInt(zkTimeout);
+               final long deadline = System.nanoTime() + 
Integer.parseInt(zkTimeout) * 1_000_000L;
                do {
                        try {
                                if(secureMode) {
@@ -325,7 +325,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("topic {} has not been created yet. Will check 
again...", topic);
                        checkZKConn.close();
                }
-               while (System.currentTimeMillis() < deadline);
+               while (System.nanoTime() < deadline);
                fail("Test topic could not be created");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51b7ede2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index cb8b0d0..580c507 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -233,7 +233,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                runner.start();
 
                final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
-               final long deadline = 30000 + System.currentTimeMillis();
+               final long deadline = 30_000_000_000L + System.nanoTime();
 
                KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
 
@@ -248,7 +248,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                        Thread.sleep(100);
                }
-               while (System.currentTimeMillis() < deadline);
+               while (System.nanoTime() < deadline);
 
                // cancel the job
                
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -406,7 +406,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
 
                final Long l50 = 50L; // the final committed offset in Kafka 
should be 50
-               final long deadline = 30000 + System.currentTimeMillis();
+               final long deadline = 30_000_000_000L + System.nanoTime();
                do {
                        Long o1 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 0);
                        Long o2 = 
kafkaOffsetHandler.getCommittedOffset(topicName, 1);
@@ -418,7 +418,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                        Thread.sleep(100);
                }
-               while (System.currentTimeMillis() < deadline);
+               while (System.nanoTime() < deadline);
 
                // cancel the job
                
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
@@ -2018,10 +2018,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        };
                        runner.start();
                        
-                       final long deadline = System.currentTimeMillis() + 
10000;
+                       final long deadline = System.nanoTime() + 
10_000_000_000L;
                        long delay;
-                       while (runner.isAlive() && (delay = deadline - 
System.currentTimeMillis()) > 0) {
-                               runner.join(delay);
+                       while (runner.isAlive() && (delay = deadline - 
System.nanoTime()) > 0) {
+                               runner.join(delay/1_000_000L);
                        }
                        
                        boolean success;

Reply via email to