Repository: kafka
Updated Branches:
  refs/heads/trunk b380a82d5 -> ef92bb4e0


MINOR: Minor reduce unnecessary calls to time.millisecond (part 2)

Avoid calling time.milliseconds more often than necessary. Cleaning and 
committing logic can use the timestamp at the start of the loop with minimal 
consequences. 5-10% improvements noticed with request rates of 450K 
records/second.

Also tidy up benchmark code a bit more.

Author: Eno Thereska <[email protected]>
Author: Eno Thereska <[email protected]>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #2603 from enothereska/minor-reduce-milliseconds2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef92bb4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef92bb4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef92bb4e

Branch: refs/heads/trunk
Commit: ef92bb4e00da10728cf74c2d81f8f2bbec4c9c02
Parents: b380a82
Author: Eno Thereska <[email protected]>
Authored: Wed Mar 1 14:36:08 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Wed Mar 1 14:36:08 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 10 +++----
 .../kafka/streams/perf/SimpleBenchmark.java     | 26 +++++++-----------
 .../processor/internals/StreamThreadTest.java   | 28 ++++++++++----------
 3 files changed, 28 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7f48200..033dc73 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -610,10 +610,10 @@ public class StreamThread extends Thread {
             } else {
                 requiresPoll = true;
             }
-            maybeCommit();
+            maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks();
 
-            maybeClean();
+            maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
     }
@@ -682,8 +682,7 @@ public class StreamThread extends Thread {
     /**
      * Commit all tasks owned by this thread if specified interval time has 
elapsed
      */
-    protected void maybeCommit() {
-        long now = time.milliseconds();
+    protected void maybeCommit(final long now) {
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
             log.info("{} Committing all tasks because the commit interval {}ms 
has elapsed", logPrefix, commitTimeMs);
@@ -698,8 +697,7 @@ public class StreamThread extends Thread {
     /**
      * Cleanup any states of the tasks that have been removed from this thread
      */
-    protected void maybeClean() {
-        long now = time.milliseconds();
+    protected void maybeClean(final long now) {
 
         if (now > lastCleanMs + cleanTimeMs) {
             stateDirectory.cleanRemovedTasks(cleanTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index dc1bdf5..cf593e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -141,7 +141,7 @@ public class SimpleBenchmark {
                 kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", 
JOIN_TOPIC_2_PREFIX + "KTableKTable");
                 break;
             case "produce":
-                produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", 
numRecords, true, numRecords, true);
+                produce(SOURCE_TOPIC);
                 break;
             case "consume":
                 consume(SOURCE_TOPIC);
@@ -444,7 +444,6 @@ public class SimpleBenchmark {
             return;
         }
         produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, 
true, numRecords, true);
-
     }
     /**
      * Produce values to a topic
@@ -458,9 +457,10 @@ public class SimpleBenchmark {
      *                   when this produce step is part of another benchmark 
that produces its own stats
      */
     private void produce(String topic, int valueSizeBytes, String clientId, 
int numRecords, boolean sequential,
-                        int upperRange, boolean printStats) throws Exception {
-
+                         int upperRange, boolean printStats) throws Exception {
 
+        processedRecords = 0;
+        processedBytes = 0;
         if (sequential) {
             if (upperRange < numRecords) throw new Exception("UpperRange must 
be >= numRecords");
         }
@@ -486,17 +486,15 @@ public class SimpleBenchmark {
             producer.send(new ProducerRecord<>(topic, key, value));
             if (sequential) key++;
             else key = rand.nextInt(upperRange);
+            processedRecords++;
+            processedBytes += value.length + Integer.SIZE;
         }
         producer.close();
 
         long endTime = System.currentTimeMillis();
 
         if (printStats) {
-            System.out.println("Producer Performance 
[records/latency/rec-sec/MB-sec write]: " +
-                numRecords + "/" +
-                (endTime - startTime) + "/" +
-                recordsPerSec(endTime - startTime, numRecords) + "/" +
-                megabytesPerSec(endTime - startTime, numRecords * 
valueSizeBytes));
+            printResults("Producer Performance [records/latency/rec-sec/MB-sec 
write]: ", endTime - startTime);
         }
     }
 
@@ -540,11 +538,7 @@ public class SimpleBenchmark {
         long endTime = System.currentTimeMillis();
 
         consumer.close();
-        System.out.println("Consumer Performance 
[records/latency/rec-sec/MB-sec read]: " +
-            processedRecords + "/" +
-            (endTime - startTime) + "/" +
-            recordsPerSec(endTime - startTime, processedRecords) + "/" +
-            megabytesPerSec(endTime - startTime, processedBytes));
+        printResults("Consumer Performance [records/latency/rec-sec/MB-sec 
read]: ", endTime - startTime);
     }
 
     private KafkaStreams createKafkaStreams(String topic, final CountDownLatch 
latch) {
@@ -734,11 +728,11 @@ public class SimpleBenchmark {
     }
 
     private double megabytesPerSec(long time, long processedBytes) {
-        return  ((double) processedBytes / 1024 / 1024) / (time / 1000.0);
+        return  (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
     }
 
     private double recordsPerSec(long time, int numRecords) {
-        return (double) numRecords / ((double) time / 1000);
+        return numRecords / (time / 1000.0);
     }
 
     private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> 
consumer, String... topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 87b30b2..e36a236 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -485,7 +485,7 @@ public class StreamThreadTest {
             stateDir3.mkdir();
             extraDir.mkdir();
 
-            MockTime mockTime = new MockTime();
+            final MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
@@ -495,8 +495,8 @@ public class StreamThreadTest {
                                                    0) {
 
                 @Override
-                public void maybeClean() {
-                    super.maybeClean();
+                public void maybeClean(long now) {
+                    super.maybeClean(now);
                 }
 
                 @Override
@@ -547,7 +547,7 @@ public class StreamThreadTest {
 
             // all directories should still exit before the cleanup delay time
             mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertTrue(stateDir3.exists());
@@ -555,7 +555,7 @@ public class StreamThreadTest {
 
             // all state directories except for task task2 & task3 will be 
removed. the extra directory should still exists
             mockTime.sleep(11L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -585,7 +585,7 @@ public class StreamThreadTest {
 
             // all state directories for task task1 & task2 still exist before 
the cleanup delay time
             mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -593,7 +593,7 @@ public class StreamThreadTest {
 
             // all state directories for task task1 & task2 are removed
             mockTime.sleep(11L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertFalse(stateDir1.exists());
             assertFalse(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -615,7 +615,7 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            MockTime mockTime = new MockTime();
+            final MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
@@ -625,8 +625,8 @@ public class StreamThreadTest {
                                                    0) {
 
                 @Override
-                public void maybeCommit() {
-                    super.maybeCommit();
+                public void maybeCommit(long now) {
+                    super.maybeCommit(now);
                 }
 
                 @Override
@@ -657,14 +657,14 @@ public class StreamThreadTest {
 
             // no task is committed before the commit interval
             mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertFalse(((TestStreamTask) task).committed);
             }
 
             // all tasks are committed after the commit interval
             mockTime.sleep(11L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertTrue(((TestStreamTask) task).committed);
                 ((TestStreamTask) task).committed = false;
@@ -672,14 +672,14 @@ public class StreamThreadTest {
 
             // no task is committed before the commit interval, again
             mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertFalse(((TestStreamTask) task).committed);
             }
 
             // all tasks are committed after the commit interval, again
             mockTime.sleep(11L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertTrue(((TestStreamTask) task).committed);
                 ((TestStreamTask) task).committed = false;

Reply via email to