Repository: apex-malhar Updated Branches: refs/heads/master 9f9da0ee1 -> b6c48bb30
APEXMALHAR-2205 #resolve #comment State management benchmark - add update Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b6c48bb3 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b6c48bb3 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b6c48bb3 Branch: refs/heads/master Commit: b6c48bb30bc1ba58ddb547c9c6bcad1ef3547a38 Parents: 9f9da0e Author: brightchen <bri...@datatorrent.com> Authored: Wed Sep 14 15:21:35 2016 -0700 Committer: brightchen <bri...@datatorrent.com> Committed: Thu Sep 15 16:31:37 2016 -0700 ---------------------------------------------------------------------- .../state/ManagedStateBenchmarkApp.java | 21 +-- .../benchmark/state/StoreOperator.java | 141 ++++++++++++++++++- .../src/main/resources/META-INF/properties.xml | 5 + .../state/ManagedStateBenchmarkAppTester.java | 37 ++++- 4 files changed, 186 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java index 25e3971..7d9c3ba 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -26,7 +26,7 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Lists; @@ -48,32 +48,35 @@ import com.datatorrent.lib.util.KeyValPair; @ApplicationAnnotation(name = "ManagedStateBenchmark") public class ManagedStateBenchmarkApp implements StreamingApplication { - private static final Logger logger = LoggerFactory.getLogger(ManagedStateBenchmarkApp.class); - protected static final String PROP_STORE_PATH = "dt.application.ManagedStateBenchmark.storeBasePath"; protected static final String DEFAULT_BASE_PATH = "ManagedStateBenchmark/Store"; + protected StoreOperator storeOperator; + protected int timeRange = 1000 * 60; // one minute range of hot keys + @Override public void populateDAG(DAG dag, Configuration conf) { TestStatsListener sl = new TestStatsListener(); sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false); TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + gen.setRange(timeRange); dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); - StoreOperator storeOperator = new StoreOperator(); + storeOperator = new StoreOperator(); storeOperator.setStore(createStore(conf)); - StoreOperator store = dag.addOperator("Store", storeOperator); + storeOperator.setTimeRange(timeRange); + storeOperator = dag.addOperator("Store", storeOperator); - dag.setAttribute(store, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + dag.setAttribute(storeOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); - dag.addStream("Events", gen.data, store.input).setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("Events", gen.data, storeOperator.input).setLocality(Locality.CONTAINER_LOCAL); } - public ManagedStateImpl createStore(Configuration conf) + public ManagedTimeUnifiedStateImpl createStore(Configuration conf) { String basePath = getStoreBasePath(conf); - ManagedStateImpl store = new ManagedStateImpl(); + ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl(); ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); return store; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java index 0d9c42b..2530611 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -18,10 +18,18 @@ */ package com.datatorrent.benchmark.state; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Future; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; + +import com.google.common.collect.Maps; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; @@ -34,9 +42,17 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo { private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class); + public static enum ExecMode + { + INSERT, + UPDATESYNC, + UPDATEASYNC + } + protected static final int numOfWindowPerStatistics = 10; - protected ManagedStateImpl store; + //this is the store we are going to use + protected ManagedTimeUnifiedStateImpl store; protected long bucketId = 1; protected long lastCheckPointWindowId = -1; @@ -44,7 +60,10 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo protected long tupleCount = 0; protected int windowCountPerStatistics = 0; protected long statisticsBeginTime = 0; - + + protected ExecMode execMode = ExecMode.INSERT; + protected int timeRange = 1000 * 60; + public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>() { @Override @@ -57,6 +76,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo @Override public void setup(OperatorContext context) { + logger.info("The execute mode is: {}", execMode.name()); store.setup(context); } @@ -80,10 +100,83 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo } } + protected transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>(); + protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap(); + + /** + * we verify 3 type of operation + * @param tuple + */ protected void processTuple(KeyValPair<byte[], byte[]> tuple) { + switch (execMode) { + case UPDATEASYNC: + //handle it specially + updateAsync(tuple); + break; + + case UPDATESYNC: + store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey())); + insertValueToStore(tuple); + break; + + default: //insert + insertValueToStore(tuple); + } + } + + protected long getTimeByKey(byte[] key) + { + long lKey = ByteBuffer.wrap(key).getLong(); + return lKey - (lKey % timeRange); + } + + // give a barrier to avoid used up memory + protected final int taskBarrier = 100000; + + /** + * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value. + * @param tuple + */ + protected void updateAsync(KeyValPair<byte[], byte[]> tuple) + { + if (taskQueue.size() > taskBarrier) { + //slow down to avoid too much task waiting. + try { + + logger.info("Queue Size: {}, wait time(milli-seconds): {}", taskQueue.size(), taskQueue.size() / taskBarrier); + Thread.sleep(taskQueue.size() / taskBarrier); + } catch (Exception e) { + //ignore + } + } + + //send request of get to the state manager and add to the taskQueue and taskToPair. + //the reason of an extra taskQueue to make sure the tasks are ordered + { + Slice key = new Slice(tuple.getKey()); + Future<Slice> task = store.getAsync(getTimeByKey(tuple.getKey()), key); + taskQueue.add(task); + taskToPair.put(task, tuple); + } + + //handle all the tasks which have finished + while (!taskQueue.isEmpty()) { + //assume task finished in order. + if (!taskQueue.peek().isDone()) { + break; + } + + Future<Slice> task = taskQueue.poll(); + insertValueToStore(taskToPair.remove(task)); + } + } + + protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple) + { Slice key = new Slice(tuple.getKey()); Slice value = new Slice(tuple.getValue()); + store.put(bucketId, key, value); ++tupleCount; } @@ -106,12 +199,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo logger.info("beforeCheckpoint {}", windowId); } - public ManagedStateImpl getStore() + public ManagedTimeUnifiedStateImpl getStore() { return store; } - public void setStore(ManagedStateImpl store) + public void setStore(ManagedTimeUnifiedStateImpl store) { this.store = store; } @@ -119,9 +212,45 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo protected void logStatistics() { long spentTime = System.currentTimeMillis() - statisticsBeginTime; - logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, tupleCount, tupleCount / spentTime); + logger.info("Time Spent: {}, Processed tuples: {}, rate per second: {}", spentTime, tupleCount, tupleCount * 1000 / spentTime); statisticsBeginTime = System.currentTimeMillis(); tupleCount = 0; } + + public ExecMode getExecMode() + { + return execMode; + } + + public void setExecMode(ExecMode execMode) + { + this.execMode = execMode; + } + + public String getExecModeString() + { + return execMode.name(); + } + + public void setExeModeStr(String execModeStr) + { + //this method used for set configuration. so, use case-insensitive + for (ExecMode em : ExecMode.values()) { + if (em.name().equalsIgnoreCase(execModeStr)) { + this.execMode = em; + } + } + } + + public int getTimeRange() + { + return timeRange; + } + + public void setTimeRange(int timeRange) + { + this.timeRange = timeRange; + } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/benchmark/src/main/resources/META-INF/properties.xml b/benchmark/src/main/resources/META-INF/properties.xml index f6d0ffc..b6131e4 100644 --- a/benchmark/src/main/resources/META-INF/properties.xml +++ b/benchmark/src/main/resources/META-INF/properties.xml @@ -193,6 +193,11 @@ <name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.passwordMeta</name> <value></value> </property> + <property> + <name>dt.application.ManagedStateBenchmark.operator.Store.execModeStr</name> + <!-- valid value are: INSERT, UPDATESYNC, UPDATEASYNC --> + <value>INSERT</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java index ca5e245..93a7720 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java @@ -18,13 +18,18 @@ */ package com.datatorrent.benchmark.state; +import java.io.File; + +import org.junit.Before; import org.junit.Test; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; +import com.datatorrent.benchmark.state.StoreOperator.ExecMode; /** * This is not a really unit test, but in fact a benchmark runner. @@ -34,9 +39,32 @@ import com.datatorrent.api.StreamingApplication; public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp { public static final String basePath = "target/temp"; - + + @Before + public void before() + { + FileUtil.fullyDelete(new File(basePath)); + } + + @Test + public void testUpdateSync() throws Exception + { + test(ExecMode.UPDATESYNC); + } + @Test - public void test() throws Exception + public void testUpdateAsync() throws Exception + { + test(ExecMode.UPDATEASYNC); + } + + @Test + public void testInsert() throws Exception + { + test(ExecMode.INSERT); + } + + public void test(ExecMode exeMode) throws Exception { Configuration conf = new Configuration(false); @@ -44,7 +72,8 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp DAG dag = lma.getDAG(); super.populateDAG(dag, conf); - + storeOperator.execMode = exeMode; + StreamingApplication app = new StreamingApplication() { @Override @@ -62,6 +91,8 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp lc.shutdown(); } + + @Override public String getStoreBasePath(Configuration conf) {