Repository: apex-malhar Updated Branches: refs/heads/master 5ae58d039 -> ea1b58f72
APEXMALHAR-2317 #resolve #comment Change SpillableBenchmarkApp to adapt the change on Spillable Data Structure Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ea1b58f7 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ea1b58f7 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ea1b58f7 Branch: refs/heads/master Commit: ea1b58f72e69ce0e1bfd35eb8b4b6756b1a5e0a0 Parents: 5ae58d0 Author: brightchen <[email protected]> Authored: Wed Oct 26 11:28:34 2016 -0700 Committer: brightchen <[email protected]> Committed: Wed Oct 26 14:56:35 2016 -0700 ---------------------------------------------------------------------- .../spillable/SpillableTestInputOperator.java | 2 +- .../spillable/SpillableTestOperator.java | 19 +++++++++++++++++++ .../spillable/SpillableBenchmarkAppTester.java | 13 ++++++++++++- 3 files changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java index 2e33721..c3eafb0 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java @@ -27,7 +27,7 @@ public class SpillableTestInputOperator extends BaseOperator implements InputOpe public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); public long count = 0; public int batchSize = 100; - public int sleepBetweenBatch = 1; + public int sleepBetweenBatch = 0; @Override public void emitTuples() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java index 3c5bf71..7c87b93 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java @@ -59,6 +59,9 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec public static Throwable errorTrace; + private long lastLogTime; + private long beginTime; + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() { @Override @@ -89,8 +92,12 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec } store.setup(context); + windowToCount.setup(context); multiMap.setup(context); + lastLogTime = System.currentTimeMillis(); + beginTime = lastLogTime; + checkData(); } @@ -148,9 +155,21 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec if (windowId % 10 == 0) { checkData(); + logStatistics(); } } + private long lastTotalCount = 0; + + public void logStatistics() + { + long countInPeriod = totalCount - lastTotalCount; + long timeInPeriod = System.currentTimeMillis() - lastLogTime; + long totalTime = System.currentTimeMillis() - beginTime; + logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}", + totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod); + } + @Override public void beforeCheckpoint(long windowId) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ea1b58f7/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java index 7f94079..cd2c640 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java @@ -18,12 +18,16 @@ */ package com.datatorrent.benchmark.spillable; +import java.io.File; + import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; @@ -33,6 +37,13 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp { private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class); public static final String basePath = "target/temp"; + + @Before + public void before() + { + FileUtil.fullyDelete(new File(basePath)); + } + @Test public void test() throws Exception { @@ -55,7 +66,7 @@ public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp // Create local cluster final LocalMode.Controller lc = lma.getController(); - lc.run(60000); + lc.run(600000); lc.shutdown();
