Repository: apex-malhar
Updated Branches:
  refs/heads/master 9f9da0ee1 -> b6c48bb30


APEXMALHAR-2205 #resolve #comment State management benchmark - add update


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b6c48bb3
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b6c48bb3
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b6c48bb3

Branch: refs/heads/master
Commit: b6c48bb30bc1ba58ddb547c9c6bcad1ef3547a38
Parents: 9f9da0e
Author: brightchen <bri...@datatorrent.com>
Authored: Wed Sep 14 15:21:35 2016 -0700
Committer: brightchen <bri...@datatorrent.com>
Committed: Thu Sep 15 16:31:37 2016 -0700

----------------------------------------------------------------------
 .../state/ManagedStateBenchmarkApp.java         |  21 +--
 .../benchmark/state/StoreOperator.java          | 141 ++++++++++++++++++-
 .../src/main/resources/META-INF/properties.xml  |   5 +
 .../state/ManagedStateBenchmarkAppTester.java   |  37 ++++-
 4 files changed, 186 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index 25e3971..7d9c3ba 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -26,7 +26,7 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Lists;
@@ -48,32 +48,35 @@ import com.datatorrent.lib.util.KeyValPair;
 @ApplicationAnnotation(name = "ManagedStateBenchmark")
 public class ManagedStateBenchmarkApp implements StreamingApplication
 {
-  private static final Logger logger = 
LoggerFactory.getLogger(ManagedStateBenchmarkApp.class);
-
   protected static final String PROP_STORE_PATH = 
"dt.application.ManagedStateBenchmark.storeBasePath";
   protected static final String DEFAULT_BASE_PATH = 
"ManagedStateBenchmark/Store";
 
+  protected StoreOperator storeOperator;
+  protected int timeRange = 1000 * 60; // one minute range of hot keys
+  
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     TestStatsListener sl = new TestStatsListener();
     sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", 
false);
     TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+    gen.setRange(timeRange);
     dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)sl));
 
-    StoreOperator storeOperator = new StoreOperator();
+    storeOperator = new StoreOperator();
     storeOperator.setStore(createStore(conf));
-    StoreOperator store = dag.addOperator("Store", storeOperator);
+    storeOperator.setTimeRange(timeRange);
+    storeOperator = dag.addOperator("Store", storeOperator);
 
-    dag.setAttribute(store, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)sl));
+    dag.setAttribute(storeOperator, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)sl));
 
-    dag.addStream("Events", gen.data, 
store.input).setLocality(Locality.CONTAINER_LOCAL);
+    dag.addStream("Events", gen.data, 
storeOperator.input).setLocality(Locality.CONTAINER_LOCAL);
   }
 
-  public ManagedStateImpl createStore(Configuration conf)
+  public ManagedTimeUnifiedStateImpl createStore(Configuration conf)
   {
     String basePath = getStoreBasePath(conf);
-    ManagedStateImpl store = new ManagedStateImpl();
+    ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl();
     ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
     return store;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 0d9c42b..2530611 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -18,10 +18,18 @@
  */
 package com.datatorrent.benchmark.state;
 
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+
+import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -34,9 +42,17 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
 {
   private static final Logger logger = 
LoggerFactory.getLogger(StoreOperator.class);
 
+  public static enum ExecMode
+  {
+    INSERT,
+    UPDATESYNC,
+    UPDATEASYNC
+  }
+  
   protected static final int numOfWindowPerStatistics = 10;
 
-  protected ManagedStateImpl store;
+  //this is the store we are going to use
+  protected ManagedTimeUnifiedStateImpl store;
   protected long bucketId = 1;
 
   protected long lastCheckPointWindowId = -1;
@@ -44,7 +60,10 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   protected long tupleCount = 0;
   protected int windowCountPerStatistics = 0;
   protected long statisticsBeginTime = 0;
-
+  
+  protected ExecMode execMode = ExecMode.INSERT;
+  protected int timeRange = 1000 * 60;
+  
   public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = 
new DefaultInputPort<KeyValPair<byte[], byte[]>>()
   {
     @Override
@@ -57,6 +76,7 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   @Override
   public void setup(OperatorContext context)
   {
+    logger.info("The execute mode is: {}", execMode.name());
     store.setup(context);
   }
 
@@ -80,10 +100,83 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
     }
   }
 
+  protected transient Queue<Future<Slice>> taskQueue = new 
LinkedList<Future<Slice>>();
+  protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> 
taskToPair = Maps.newHashMap();
+  
+  /**
+   * we verify 3 type of operation 
+   * @param tuple
+   */
   protected void processTuple(KeyValPair<byte[], byte[]> tuple)
   {
+    switch (execMode) {
+      case UPDATEASYNC:
+        //handle it specially
+        updateAsync(tuple);
+        break;
+
+      case UPDATESYNC:
+        store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
+        insertValueToStore(tuple);
+        break;
+        
+      default: //insert
+        insertValueToStore(tuple);
+    }
+  }
+  
+  protected long getTimeByKey(byte[] key)
+  {
+    long lKey = ByteBuffer.wrap(key).getLong();
+    return lKey - (lKey % timeRange);
+  }
+  
+  // give a barrier to avoid used up memory
+  protected final int taskBarrier = 100000;
+  
+  /**
+   * This method first send request of get to the state manager, then handle 
all the task(get) which already done and update the value.
+   * @param tuple
+   */
+  protected void updateAsync(KeyValPair<byte[], byte[]> tuple)
+  {
+    if (taskQueue.size() > taskBarrier) {
+      //slow down to avoid too much task waiting.
+      try {
+        
+        logger.info("Queue Size: {}, wait time(milli-seconds): {}", 
taskQueue.size(), taskQueue.size() / taskBarrier);
+        Thread.sleep(taskQueue.size() / taskBarrier);
+      } catch (Exception e) {
+        //ignore
+      }
+    }
+    
+    //send request of get to the state manager and add to the taskQueue and 
taskToPair.
+    //the reason of an extra taskQueue to make sure the tasks are ordered
+    {
+      Slice key = new Slice(tuple.getKey());
+      Future<Slice> task = store.getAsync(getTimeByKey(tuple.getKey()), key);
+      taskQueue.add(task);
+      taskToPair.put(task, tuple);
+    }
+
+    //handle all the tasks which have finished
+    while (!taskQueue.isEmpty()) {
+      //assume task finished in order.
+      if (!taskQueue.peek().isDone()) {
+        break;
+      }
+
+      Future<Slice> task = taskQueue.poll();
+      insertValueToStore(taskToPair.remove(task));
+    }
+  }
+  
+  protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
+  {
     Slice key = new Slice(tuple.getKey());
     Slice value = new Slice(tuple.getValue());
+
     store.put(bucketId, key, value);
     ++tupleCount;
   }
@@ -106,12 +199,12 @@ public class StoreOperator extends BaseOperator 
implements Operator.CheckpointNo
     logger.info("beforeCheckpoint {}", windowId);
   }
 
-  public ManagedStateImpl getStore()
+  public ManagedTimeUnifiedStateImpl getStore()
   {
     return store;
   }
 
-  public void setStore(ManagedStateImpl store)
+  public void setStore(ManagedTimeUnifiedStateImpl store)
   {
     this.store = store;
   }
@@ -119,9 +212,45 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   protected void logStatistics()
   {
     long spentTime = System.currentTimeMillis() - statisticsBeginTime;
-    logger.info("Time Spent: {}, Processed tuples: {}, rate: {}", spentTime, 
tupleCount, tupleCount / spentTime);
+    logger.info("Time Spent: {}, Processed tuples: {}, rate per second: {}", 
spentTime, tupleCount, tupleCount * 1000 / spentTime);
 
     statisticsBeginTime = System.currentTimeMillis();
     tupleCount = 0;
   }
+
+  public ExecMode getExecMode()
+  {
+    return execMode;
+  }
+
+  public void setExecMode(ExecMode execMode)
+  {
+    this.execMode = execMode;
+  }
+
+  public String getExecModeString()
+  {
+    return execMode.name();
+  }
+  
+  public void setExeModeStr(String execModeStr)
+  {
+    //this method used for set configuration. so, use case-insensitive
+    for (ExecMode em : ExecMode.values()) {
+      if (em.name().equalsIgnoreCase(execModeStr)) {
+        this.execMode = em;
+      }
+    }
+  }
+
+  public int getTimeRange()
+  {
+    return timeRange;
+  }
+
+  public void setTimeRange(int timeRange)
+  {
+    this.timeRange = timeRange;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/benchmark/src/main/resources/META-INF/properties.xml 
b/benchmark/src/main/resources/META-INF/properties.xml
index f6d0ffc..b6131e4 100644
--- a/benchmark/src/main/resources/META-INF/properties.xml
+++ b/benchmark/src/main/resources/META-INF/properties.xml
@@ -193,6 +193,11 @@
     
<name>dt.application.CouchBaseAppInput.operator.couchbaseInput.store.passwordMeta</name>
     <value></value>
   </property>
+  <property>
+    
<name>dt.application.ManagedStateBenchmark.operator.Store.execModeStr</name>
+    <!-- valid value are: INSERT, UPDATESYNC, UPDATEASYNC -->
+    <value>INSERT</value>
+  </property>
 
 </configuration>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b6c48bb3/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
index ca5e245..93a7720 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
@@ -18,13 +18,18 @@
  */
 package com.datatorrent.benchmark.state;
 
+import java.io.File;
+
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
 
 /**
  * This is not a really unit test, but in fact a benchmark runner.
@@ -34,9 +39,32 @@ import com.datatorrent.api.StreamingApplication;
 public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
 {
   public static final String basePath = "target/temp";
-
+  
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+  
+  @Test
+  public void testUpdateSync() throws Exception
+  {
+    test(ExecMode.UPDATESYNC);
+  }
+  
   @Test
-  public void test() throws Exception
+  public void testUpdateAsync() throws Exception
+  {
+    test(ExecMode.UPDATEASYNC);
+  }
+  
+  @Test
+  public void testInsert() throws Exception
+  {
+    test(ExecMode.INSERT);
+  }
+  
+  public void test(ExecMode exeMode) throws Exception
   {
     Configuration conf = new Configuration(false);
 
@@ -44,7 +72,8 @@ public class ManagedStateBenchmarkAppTester extends 
ManagedStateBenchmarkApp
     DAG dag = lma.getDAG();
 
     super.populateDAG(dag, conf);
-
+    storeOperator.execMode = exeMode;
+    
     StreamingApplication app = new StreamingApplication()
     {
       @Override
@@ -62,6 +91,8 @@ public class ManagedStateBenchmarkAppTester extends 
ManagedStateBenchmarkApp
     lc.shutdown();
   }
 
+
+  
   @Override
   public String getStoreBasePath(Configuration conf)
   {

Reply via email to