Repository: kafka Updated Branches: refs/heads/trunk b380a82d5 -> ef92bb4e0
MINOR: Minor reduce unnecessary calls to time.millisecond (part 2) Avoid calling time.milliseconds more often than necessary. Cleaning and committing logic can use the timestamp at the start of the loop with minimal consequences. 5-10% improvements noticed with request rates of 450K records/second. Also tidy up benchmark code a bit more. Author: Eno Thereska <[email protected]> Author: Eno Thereska <[email protected]> Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang Closes #2603 from enothereska/minor-reduce-milliseconds2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef92bb4e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef92bb4e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef92bb4e Branch: refs/heads/trunk Commit: ef92bb4e00da10728cf74c2d81f8f2bbec4c9c02 Parents: b380a82 Author: Eno Thereska <[email protected]> Authored: Wed Mar 1 14:36:08 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 1 14:36:08 2017 -0800 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 10 +++---- .../kafka/streams/perf/SimpleBenchmark.java | 26 +++++++----------- .../processor/internals/StreamThreadTest.java | 28 ++++++++++---------- 3 files changed, 28 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7f48200..033dc73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -610,10 +610,10 @@ public class StreamThread extends Thread { } else { requiresPoll = true; } - maybeCommit(); + maybeCommit(timerStartedMs); maybeUpdateStandbyTasks(); - maybeClean(); + maybeClean(timerStartedMs); } log.info("{} Shutting down at user request", logPrefix); } @@ -682,8 +682,7 @@ public class StreamThread extends Thread { /** * Commit all tasks owned by this thread if specified interval time has elapsed */ - protected void maybeCommit() { - long now = time.milliseconds(); + protected void maybeCommit(final long now) { if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { log.info("{} Committing all tasks because the commit interval {}ms has elapsed", logPrefix, commitTimeMs); @@ -698,8 +697,7 @@ public class StreamThread extends Thread { /** * Cleanup any states of the tasks that have been removed from this thread */ - protected void maybeClean() { - long now = time.milliseconds(); + protected void maybeClean(final long now) { if (now > lastCleanMs + cleanTimeMs) { stateDirectory.cleanRemovedTasks(cleanTimeMs); http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index dc1bdf5..cf593e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -141,7 +141,7 @@ public class SimpleBenchmark { kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable"); break; case "produce": - produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); + produce(SOURCE_TOPIC); break; case "consume": consume(SOURCE_TOPIC); @@ -444,7 +444,6 @@ public class SimpleBenchmark { return; } produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); - } /** * Produce values to a topic @@ -458,9 +457,10 @@ public class SimpleBenchmark { * when this produce step is part of another benchmark that produces its own stats */ private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, - int upperRange, boolean printStats) throws Exception { - + int upperRange, boolean printStats) throws Exception { + processedRecords = 0; + processedBytes = 0; if (sequential) { if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords"); } @@ -486,17 +486,15 @@ public class SimpleBenchmark { producer.send(new ProducerRecord<>(topic, key, value)); if (sequential) key++; else key = rand.nextInt(upperRange); + processedRecords++; + processedBytes += value.length + Integer.SIZE; } producer.close(); long endTime = System.currentTimeMillis(); if (printStats) { - System.out.println("Producer Performance [records/latency/rec-sec/MB-sec write]: " + - numRecords + "/" + - (endTime - startTime) + "/" + - recordsPerSec(endTime - startTime, numRecords) + "/" + - megabytesPerSec(endTime - startTime, numRecords * valueSizeBytes)); + printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime); } } @@ -540,11 +538,7 @@ public class SimpleBenchmark { long endTime = System.currentTimeMillis(); consumer.close(); - System.out.println("Consumer Performance [records/latency/rec-sec/MB-sec read]: " + - processedRecords + "/" + - (endTime - startTime) + "/" + - recordsPerSec(endTime - startTime, processedRecords) + "/" + - megabytesPerSec(endTime - startTime, processedBytes)); + printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime); } private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) { @@ -734,11 +728,11 @@ public class SimpleBenchmark { } private double megabytesPerSec(long time, long processedBytes) { - return ((double) processedBytes / 1024 / 1024) / (time / 1000.0); + return (processedBytes / 1024.0 / 1024.0) / (time / 1000.0); } private double recordsPerSec(long time, int numRecords) { - return (double) numRecords / ((double) time / 1000); + return numRecords / (time / 1000.0); } private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 87b30b2..e36a236 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -485,7 +485,7 @@ public class StreamThreadTest { stateDir3.mkdir(); extraDir.mkdir(); - MockTime mockTime = new MockTime(); + final MockTime mockTime = new MockTime(); TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); @@ -495,8 +495,8 @@ public class StreamThreadTest { 0) { @Override - public void maybeClean() { - super.maybeClean(); + public void maybeClean(long now) { + super.maybeClean(now); } @Override @@ -547,7 +547,7 @@ public class StreamThreadTest { // all directories should still exit before the cleanup delay time mockTime.sleep(cleanupDelay - 10L); - thread.maybeClean(); + thread.maybeClean(mockTime.milliseconds()); assertTrue(stateDir1.exists()); assertTrue(stateDir2.exists()); assertTrue(stateDir3.exists()); @@ -555,7 +555,7 @@ public class StreamThreadTest { // all state directories except for task task2 & task3 will be removed. the extra directory should still exists mockTime.sleep(11L); - thread.maybeClean(); + thread.maybeClean(mockTime.milliseconds()); assertTrue(stateDir1.exists()); assertTrue(stateDir2.exists()); assertFalse(stateDir3.exists()); @@ -585,7 +585,7 @@ public class StreamThreadTest { // all state directories for task task1 & task2 still exist before the cleanup delay time mockTime.sleep(cleanupDelay - 10L); - thread.maybeClean(); + thread.maybeClean(mockTime.milliseconds()); assertTrue(stateDir1.exists()); assertTrue(stateDir2.exists()); assertFalse(stateDir3.exists()); @@ -593,7 +593,7 @@ public class StreamThreadTest { // all state directories for task task1 & task2 are removed mockTime.sleep(11L); - thread.maybeClean(); + thread.maybeClean(mockTime.milliseconds()); assertFalse(stateDir1.exists()); assertFalse(stateDir2.exists()); assertFalse(stateDir3.exists()); @@ -615,7 +615,7 @@ public class StreamThreadTest { StreamsConfig config = new StreamsConfig(props); - MockTime mockTime = new MockTime(); + final MockTime mockTime = new MockTime(); TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); @@ -625,8 +625,8 @@ public class StreamThreadTest { 0) { @Override - public void maybeCommit() { - super.maybeCommit(); + public void maybeCommit(long now) { + super.maybeCommit(now); } @Override @@ -657,14 +657,14 @@ public class StreamThreadTest { // no task is committed before the commit interval mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(); + thread.maybeCommit(mockTime.milliseconds()); for (StreamTask task : thread.tasks().values()) { assertFalse(((TestStreamTask) task).committed); } // all tasks are committed after the commit interval mockTime.sleep(11L); - thread.maybeCommit(); + thread.maybeCommit(mockTime.milliseconds()); for (StreamTask task : thread.tasks().values()) { assertTrue(((TestStreamTask) task).committed); ((TestStreamTask) task).committed = false; @@ -672,14 +672,14 @@ public class StreamThreadTest { // no task is committed before the commit interval, again mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(); + thread.maybeCommit(mockTime.milliseconds()); for (StreamTask task : thread.tasks().values()) { assertFalse(((TestStreamTask) task).committed); } // all tasks are committed after the commit interval, again mockTime.sleep(11L); - thread.maybeCommit(); + thread.maybeCommit(mockTime.milliseconds()); for (StreamTask task : thread.tasks().values()) { assertTrue(((TestStreamTask) task).committed); ((TestStreamTask) task).committed = false;
