Repository: apex-malhar Updated Branches: refs/heads/master 18e49df62 -> 28b89176c
APEXMALHAR-2329 #resolve #comment ManagedState benchmark should not use constant bucket Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28b89176 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28b89176 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28b89176 Branch: refs/heads/master Commit: 28b89176ce5468f04371dd19d9e49fe030f44248 Parents: 18e49df Author: brightchen <[email protected]> Authored: Mon Nov 7 22:07:41 2016 -0800 Committer: brightchen <[email protected]> Committed: Wed Nov 9 13:23:19 2016 -0800 ---------------------------------------------------------------------- .../benchmark/state/StoreOperator.java | 86 +++++++++++++------- .../src/main/resources/META-INF/properties.xml | 2 +- .../state/ManagedStateBenchmarkAppTest.java | 6 +- 3 files changed, 62 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java index 2748c29..ad92b60 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -42,27 +42,28 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo { private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class); - public static enum ExecMode + public enum ExecMode { INSERT, - UPDATESYNC, - UPDATEASYNC + UPDATE_SYNC, + UPDATE_ASYNC, + GET_SYNC, + DO_NOTHING } - protected static final int numOfWindowPerStatistics = 10; + private static final int numOfWindowPerStatistics = 120; //this is the store we are going to use - protected ManagedTimeUnifiedStateImpl store; - protected long bucketId = 1; + private ManagedTimeUnifiedStateImpl store; - protected long lastCheckPointWindowId = -1; - protected long currentWindowId; - protected long tupleCount = 0; - protected int windowCountPerStatistics = 0; - protected long statisticsBeginTime = 0; + private long lastCheckPointWindowId = -1; + private long currentWindowId; + private long tupleCount = 0; + private int windowCountPerStatistics = 0; + private long statisticsBeginTime = 0; - protected ExecMode execMode = ExecMode.INSERT; - protected int timeRange = 1000 * 60; + private ExecMode execMode = ExecMode.INSERT; + private int timeRange = 1000 * 60; public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>() { @@ -81,6 +82,11 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo } @Override + public void teardown() + { + } + + @Override public void beginWindow(long windowId) { currentWindowId = windowId; @@ -100,45 +106,70 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo } } - protected transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>(); - protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap(); + private transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>(); + private transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap(); /** * we verify 3 type of operation * @param tuple */ - protected void processTuple(KeyValPair<byte[], byte[]> tuple) + private Slice keySliceForRead = new Slice(null, 0, 0); + private void processTuple(KeyValPair<byte[], byte[]> tuple) { switch (execMode) { - case UPDATEASYNC: + case UPDATE_ASYNC: //handle it specially updateAsync(tuple); break; - case UPDATESYNC: - store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey())); + + case UPDATE_SYNC: + keySliceForRead.buffer = tuple.getKey(); + keySliceForRead.offset = 0; + keySliceForRead.length = tuple.getKey().length; + store.getSync(getTimeByKey(tuple.getKey()), keySliceForRead); + insertValueToStore(tuple); break; + case GET_SYNC: + store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey())); + break; + + case DO_NOTHING: + break; + default: //insert insertValueToStore(tuple); } + + ++tupleCount; } - protected long getTimeByKey(byte[] key) + private transient long sameKeyCount = 0; + private transient long preKey = -1; + private long getTimeByKey(byte[] key) { long lKey = ByteBuffer.wrap(key).getLong(); - return lKey - (lKey % timeRange); + lKey = lKey - (lKey % timeRange); + if (preKey == lKey) { + sameKeyCount++; + } else { + logger.info("key: {} count: {}", preKey, sameKeyCount); + preKey = lKey; + sameKeyCount = 1; + } + return lKey; } // give a barrier to avoid used up memory - protected final int taskBarrier = 100000; + private final int taskBarrier = 100000; /** * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value. * @param tuple */ - protected void updateAsync(KeyValPair<byte[], byte[]> tuple) + private void updateAsync(KeyValPair<byte[], byte[]> tuple) { if (taskQueue.size() > taskBarrier) { //slow down to avoid too much task waiting. @@ -172,13 +203,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo } } - protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple) + private void insertValueToStore(KeyValPair<byte[], byte[]> tuple) { Slice key = new Slice(tuple.getKey()); Slice value = new Slice(tuple.getValue()); - store.put(bucketId, key, value); - ++tupleCount; + store.put(System.currentTimeMillis(), key, value); } @Override @@ -209,10 +239,10 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo this.store = store; } - protected void logStatistics() + private void logStatistics() { long spentTime = System.currentTimeMillis() - statisticsBeginTime; - logger.info("Time Spent: {}, Processed tuples: {}, rate per second: {}", spentTime, tupleCount, tupleCount * 1000 / spentTime); + logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime); statisticsBeginTime = System.currentTimeMillis(); tupleCount = 0; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/benchmark/src/main/resources/META-INF/properties.xml b/benchmark/src/main/resources/META-INF/properties.xml index b6131e4..aec92d4 100644 --- a/benchmark/src/main/resources/META-INF/properties.xml +++ b/benchmark/src/main/resources/META-INF/properties.xml @@ -195,7 +195,7 @@ </property> <property> <name>dt.application.ManagedStateBenchmark.operator.Store.execModeStr</name> - <!-- valid value are: INSERT, UPDATESYNC, UPDATEASYNC --> + <!-- valid value are: INSERT, UPDATE_SYNC, UPDATE_ASYNC --> <value>INSERT</value> </property> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java index 4792843..5279d36 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java @@ -49,13 +49,13 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp @Test public void testUpdateSync() throws Exception { - test(ExecMode.UPDATESYNC); + test(ExecMode.UPDATE_SYNC); } @Test public void testUpdateAsync() throws Exception { - test(ExecMode.UPDATEASYNC); + test(ExecMode.UPDATE_ASYNC); } @Test @@ -72,7 +72,7 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp DAG dag = lma.getDAG(); super.populateDAG(dag, conf); - storeOperator.execMode = exeMode; + storeOperator.setExecMode(exeMode); StreamingApplication app = new StreamingApplication() {
