Made the twitter demo us an embeddable query operator.

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/819e175f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/819e175f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/819e175f

Branch: refs/heads/feature-AppData
Commit: 819e175f54ab9869d932ec04b73cfaff451387ac
Parents: 6cff911
Author: Timothy Farkas <[email protected]>
Authored: Tue Aug 11 15:23:45 2015 -0700
Committer: David Yan <[email protected]>
Committed: Fri Aug 28 18:09:32 2015 -0700

----------------------------------------------------------------------
 demos/pom.xml                                   |   2 +-
 .../twitter/TwitterTopCounterApplication.java   |   8 +-
 .../src/main/resources/META-INF/properties.xml  |   4 +
 .../com/datatorrent/lib/appdata/StoreUtils.java |  84 +++++++++++
 .../lib/appdata/query/WindowBoundedService.java | 147 +++++++++++++++++++
 .../snapshot/AbstractAppDataSnapshotServer.java |  69 +++++++--
 .../lib/io/PubSubWebSocketAppDataQuery.java     |  82 ++++++++++-
 .../query/QueryManagerAsynchronousTest.java     |   1 -
 .../appdata/query/WindowBoundedServiceTest.java | 124 ++++++++++++++++
 pom.xml                                         |   4 +-
 10 files changed, 507 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 2e93e02..7976aa0 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -28,7 +28,7 @@
   </modules>
 
   <properties>
-    <datatorrent.version>3.0.0</datatorrent.version>
+    <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version>
     
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
     <semver.plugin.skip>true</semver.plugin.skip>
   </properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
 
b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
index db59bdf..508b8a1 100644
--- 
a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ 
b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
@@ -195,7 +195,7 @@ public class TwitterTopCounterApplication implements 
StreamingApplication
     if (!StringUtils.isEmpty(gatewayAddress)) {
       URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
 
-      AppDataSnapshotServerMap snapshotServer = dag.addOperator("Snapshot 
Server", new AppDataSnapshotServerMap());
+      AppDataSnapshotServerMap snapshotServer = 
dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
 
       Map<String, String> conversionMap = Maps.newHashMap();
       conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
@@ -204,15 +204,15 @@ public class TwitterTopCounterApplication implements 
StreamingApplication
       snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
       snapshotServer.setTableFieldToMapField(conversionMap);
 
-      PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new 
PubSubWebSocketAppDataQuery());
+      PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
       wsQuery.setUri(uri);
-      Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+      snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
       PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", 
new PubSubWebSocketAppDataResult());
       wsResult.setUri(uri);
       Operator.InputPort<String> queryResultPort = wsResult.input;
 
       dag.addStream("MapProvider", topCount, snapshotServer.input);
-      dag.addStream("Query", queryPort, snapshotServer.query);
       dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
     }
     else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml 
b/demos/twitter/src/main/resources/META-INF/properties.xml
index 53b7fb9..e2547cc 100644
--- a/demos/twitter/src/main/resources/META-INF/properties.xml
+++ b/demos/twitter/src/main/resources/META-INF/properties.xml
@@ -90,6 +90,10 @@
     <value>TwitterHashtagQueryDemo</value>
   </property>
   <property>
+    
<name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>TwitterHashtagQueryDemo</value>
+  </property>
+  <property>
     <name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name>
     <value>TwitterHashtagQueryResultDemo</value>
   </property>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java 
b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
new file mode 100644
index 0000000..11efc02
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
com.datatorrent.lib.io.SimpleSinglePortInputOperator.BufferingOutputPort;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Sink;
+
+public class StoreUtils
+{
+  /**
+   * This is a utility method which is used to attach the output port of an 
{@link EmbeddableQueryInfoProvider} to the input port
+   * of the encapsulating {@link AppData.Store}.
+   * @param <T> The type of data emitted by the {@link 
EmbeddableQueryInfoProvider}'s output port and received by the
+   * {@link AppData.Store}'s input port.
+   * @param outputPort The output port of the {@link 
EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
+   * @param inputPort The input port of the {@link AppData.Store} which is 
using an {@link EmbeddableQueryInfoProvider}.
+   */
+  public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> 
outputPort, final DefaultInputPort<T> inputPort)
+  {
+    outputPort.setSink(
+      new Sink<Object>()
+      {
+        @Override
+        @SuppressWarnings("unchecked")
+        public void put(Object tuple)
+        {
+          LOG.debug("processing tuple");
+          inputPort.process((T)tuple);
+        }
+
+        @Override
+        public int getCount(boolean reset)
+        {
+          return 0;
+        }
+
+      }
+    );
+  }
+
+  /**
+   * This is a utility class which is responsible for flushing {@link 
BufferingOutputPort}s.
+   * @param <TUPLE_TYPE> The type of the tuple emitted by the {@link 
BufferingOutputPort}.
+   */
+  public static class BufferingOutputPortFlusher<TUPLE_TYPE> implements 
Runnable
+  {
+    private final BufferingOutputPort<TUPLE_TYPE> port;
+
+    public BufferingOutputPortFlusher(BufferingOutputPort<TUPLE_TYPE> port)
+    {
+      this.port = Preconditions.checkNotNull(port);
+    }
+
+    @Override
+    public void run()
+    {
+      port.flush(Integer.MAX_VALUE);
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
 
b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
new file mode 100644
index 0000000..7b82f4f
--- /dev/null
+++ 
b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This class asynchronously executes a function so that the function is only 
called between calls
+ * to {@link Operator#beginWindow} and {@link Operator#endWindow}.<br/><br/>
+ * This service works by asynchronously calling its {@link #execute} method 
only after
+ * {@link #beginWindow} and called and before {@link #endWindow} ends. Calls 
to {@link #beginWindow}
+ * and {@link endWindow} will happen in the enclosing {@link Operator}'s main 
thread.
+ * <br/><br/>
+ * <b>Note:</b> This service cannot be used in operators which allow 
checkpointing within an
+ * application window.
+ */
+public class WindowBoundedService implements Component<OperatorContext>
+{
+  public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10;
+
+  /**
+   * The execute interval period in milliseconds.
+   */
+  private final long executeIntervalMillis;
+  /**
+   * The code to execute asynchronously.
+   */
+  private final Runnable runnable;
+  protected transient ExecutorService executorThread;
+
+  private final transient Semaphore mutex = new Semaphore(0);
+
+  public WindowBoundedService(Runnable runnable)
+  {
+    this.executeIntervalMillis = DEFAULT_FLUSH_INTERVAL_MILLIS;
+    this.runnable = Preconditions.checkNotNull(runnable);
+  }
+
+  public WindowBoundedService(long executeIntervalMillis,
+                              Runnable runnable)
+  {
+    Preconditions.checkArgument(executeIntervalMillis > 0,
+                                "The executeIntervalMillis must be positive");
+    this.executeIntervalMillis = executeIntervalMillis;
+    this.runnable = Preconditions.checkNotNull(runnable);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    executorThread = Executors.newSingleThreadScheduledExecutor(new 
NameableThreadFactory("Query Executor Thread"));
+    executorThread.submit(new AsynchExecutorThread(Thread.currentThread()));
+  }
+
+  public void beginWindow(long windowId)
+  {
+    mutex.release();
+  }
+
+  public void endWindow()
+  {
+    try {
+      mutex.acquire();
+    } catch (InterruptedException ex) {
+      DTThrowable.wrapIfChecked(ex);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    executorThread.shutdownNow();
+  }
+
+  public class AsynchExecutorThread implements Callable<Void>
+  {
+    private final Thread mainThread;
+    private long lastExecuteTime = 0;
+
+    public AsynchExecutorThread(Thread mainThread)
+    {
+      this.mainThread = mainThread;
+    }
+
+    @Override
+    @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"})
+    public Void call() throws Exception
+    {
+      try {
+        loop();
+      } catch (Exception e) {
+        LOG.error("Exception thrown while processing:", e);
+        mutex.release();
+        mainThread.interrupt();
+      }
+
+      return null;
+    }
+
+    @SuppressWarnings("SleepWhileInLoop")
+    private void loop() throws Exception
+    {
+      while (true) {
+        long currentTime = System.currentTimeMillis();
+        long diff = currentTime - lastExecuteTime;
+        if (diff > executeIntervalMillis) {
+          lastExecuteTime = currentTime;
+          mutex.acquireUninterruptibly();
+          runnable.run();
+          mutex.release();
+        } else {
+          Thread.sleep(executeIntervalMillis - diff);
+        }
+      }
+    }
+  }
+
+  private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(WindowBoundedService.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
 
b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index d0241d2..a51908f 100644
--- 
a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ 
b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.mutable.MutableLong;
 
+import com.datatorrent.lib.appdata.StoreUtils;
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
 import com.datatorrent.lib.appdata.query.QueryExecutor;
@@ -43,8 +45,10 @@ import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 
 import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
 
 /**
  * This is an abstract operator for the {@link SnapshotSchema}. This operator 
is designed to accept input data
@@ -54,7 +58,7 @@ import com.datatorrent.common.experimental.AppData;
  * @param <INPUT_EVENT> The type of the input events that the operator accepts.
  * @since 3.0.0
  */
-public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements 
Operator
+public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements 
Operator, AppData.Store<String>
 {
   /**
    * The {@link QueryManagerSynchronous} for the operator.
@@ -84,11 +88,13 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    * The current data to be served by the operator.
    */
   private List<GPOMutable> currentData = Lists.newArrayList();
+  private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
 
   @AppData.ResultPort
   public final transient DefaultOutputPort<String> queryResult = new 
DefaultOutputPort<String>();
 
   @AppData.QueryPort
+  @InputPortFieldAnnotation(optional=true)
   public transient final DefaultInputPort<String> query = new 
DefaultInputPort<String>()
   {
     @Override
@@ -99,24 +105,22 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
 
       try {
         query = queryDeserializerFactory.deserialize(queryJSON);
-      }
-      catch(IOException ex) {
+      } catch (IOException ex) {
         LOG.error("Error parsing query: {}", queryJSON);
         LOG.error("{}", ex);
         return;
       }
 
-      if(query instanceof SchemaQuery) {
-        SchemaResult schemaResult = 
schemaRegistry.getSchemaResult((SchemaQuery) query);
+      if (query instanceof SchemaQuery) {
+        SchemaResult schemaResult = 
schemaRegistry.getSchemaResult((SchemaQuery)query);
 
-        if(schemaResult != null) {
+        if (schemaResult != null) {
           String schemaResultJSON = 
resultSerializerFactory.serialize(schemaResult);
           LOG.debug("emitting {}", schemaResultJSON);
           queryResult.emit(schemaResultJSON);
         }
-      }
-      else if(query instanceof DataQuerySnapshot) {
-        queryProcessor.enqueue((DataQuerySnapshot) query, null, null);
+      } else if (query instanceof DataQuerySnapshot) {
+        queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
       }
     }
   };
@@ -150,6 +154,13 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    */
   public abstract GPOMutable convert(INPUT_EVENT inputEvent);
 
+
+  @Override
+  final public void activate(OperatorContext ctx)
+  {
+    embeddableQueryInfoProvider.activate(ctx);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void setup(OperatorContext context)
@@ -164,17 +175,33 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     queryDeserializerFactory.setContext(DataQuerySnapshot.class, 
schemaRegistry);
     resultSerializerFactory = new MessageSerializerFactory(resultFormatter);
     queryProcessor.setup(context);
+
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.enableEmbeddedMode();
+      LOG.info("An embeddable query operator is being used of class {}.", 
embeddableQueryInfoProvider.getClass().getName());
+      
StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
+                                             query);
+      embeddableQueryInfoProvider.setup(context);
+    }
   }
 
   @Override
   public void beginWindow(long windowId)
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.beginWindow(windowId);
+    }
+
     queryProcessor.beginWindow(windowId);
   }
 
   @Override
   public void endWindow()
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.endWindow();
+    }
+
     {
       Result result = null;
 
@@ -191,9 +218,21 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void teardown()
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.teardown();
+    }
+
     queryProcessor.teardown();
   }
 
+  @Override
+  public void deactivate()
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.deactivate();
+    }
+  }
+
   /**
    * Gets the JSON for the schema.
    * @return the JSON for the schema.
@@ -230,6 +269,18 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     this.resultFormatter = resultFormatter;
   }
 
+  @Override
+  public EmbeddableQueryInfoProvider<String> getEmbeddableQueryInfoProvider()
+  {
+    return embeddableQueryInfoProvider;
+  }
+
+  @Override
+  public void 
setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<String> 
embeddableQueryInfoProvider)
+  {
+    this.embeddableQueryInfoProvider = 
Preconditions.checkNotNull(embeddableQueryInfoProvider);
+  }
+
   /**
    * The {@link QueryExecutor} which returns the results for queries.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 14a2d2b..031befd 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,16 +19,23 @@ package com.datatorrent.lib.io;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import javax.validation.constraints.Min;
+
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.appdata.StoreUtils.BufferingOutputPortFlusher;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
 
 import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
 import com.datatorrent.common.util.PubSubMessage;
 
 /**
@@ -40,11 +47,18 @@ import com.datatorrent.common.util.PubSubMessage;
  * @tags input, app data, query
  * @since 3.0.0
  */
-public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider
+public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider, 
EmbeddableQueryInfoProvider<String>
 {
   private static final Logger logger = 
LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class);
 
   private static final long serialVersionUID = 201506121124L;
+  public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10;
+
+  private boolean useEmitThread;
+  @Min(0)
+  private long executeIntervalMillis = DEFAULT_EXECUTE_INTERVAL_MILLIS;
+
+  private transient WindowBoundedService windowBoundedService;
 
   public PubSubWebSocketAppDataQuery()
   {
@@ -57,6 +71,42 @@ public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<St
     this.uri = uriHelper(context, uri);
     logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), 
this.getTopic());
     super.setup(context);
+
+    if (useEmitThread) {
+      windowBoundedService = new WindowBoundedService(executeIntervalMillis,
+                                                      new 
BufferingOutputPortFlusher<>(this.outputPort));
+      windowBoundedService.setup(context);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    
+    if (windowBoundedService != null) {
+      windowBoundedService.beginWindow(windowId);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (windowBoundedService != null) {
+      windowBoundedService.endWindow();
+    }
+
+    super.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    if (windowBoundedService != null) {
+      windowBoundedService.teardown();
+    }
+
+    super.teardown();
   }
 
   public static URI uriHelper(OperatorContext context, URI uri)
@@ -139,4 +189,34 @@ public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<St
   {
     return "pubsub";
   }
+
+  @Override
+  public DefaultOutputPort<String> getOutputPort()
+  {
+    return outputPort;
+  }
+
+  @Override
+  public void enableEmbeddedMode()
+  {
+    useEmitThread = true;
+  }
+
+  /**
+   * Get the number of milliseconds between calls to execute.
+   * @return The number of milliseconds between calls to execute.
+   */
+  public long getExecuteIntervalMillis()
+  {
+    return executeIntervalMillis;
+  }
+
+  /**
+   * The number of milliseconds between calls to execute.
+   * @param executeIntervalMillis The number of milliseconds between calls to 
execute.
+   */
+  public void setExecuteIntervalMillis(long executeIntervalMillis)
+  {
+    this.executeIntervalMillis = executeIntervalMillis;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
 
b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
index ef5f0f5..277bb3a 100644
--- 
a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
@@ -54,7 +54,6 @@ public class QueryManagerAsynchronousTest
         Thread.sleep(200);
       }
       catch(InterruptedException ex) {
-        throw new RuntimeException(ex);
       }
       Thread.interrupted();
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
 
b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
new file mode 100644
index 0000000..3674ce5
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+
+import 
com.datatorrent.lib.appdata.query.QueryManagerAsynchronousTest.InterruptClear;
+
+public class WindowBoundedServiceTest
+{
+  @Rule
+  public TestWatcher testMeta = new InterruptClear();
+
+  @Test
+  public void simpleLoopTest() throws Exception
+  {
+    CounterRunnable counterRunnable = new CounterRunnable();
+
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        counterRunnable);
+    wbs.setup(null);
+    Thread.sleep(500);
+    Assert.assertEquals(0, counterRunnable.getCounter());
+    wbs.beginWindow(0);
+    Thread.sleep(500);
+    wbs.endWindow();
+    int currentCount = counterRunnable.getCounter();
+    Thread.sleep(500);
+    wbs.teardown();
+    Assert.assertEquals(currentCount, counterRunnable.getCounter());
+  }
+
+  @Test
+  public void runTest() throws Exception
+  {
+    CounterRunnable counterRunnable = new CounterRunnable();
+
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        counterRunnable);
+    wbs.setup(null);
+    wbs.beginWindow(0);
+    Thread.sleep(500);
+    wbs.endWindow();
+    wbs.teardown();
+    Assert.assertTrue(counterRunnable.getCounter() > 0);
+  }
+
+  @Test
+  public void exceptionTest() throws Exception
+  {
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        new 
ExceptionRunnable());
+
+    wbs.setup(null);
+    wbs.beginWindow(0);
+
+    boolean caughtException = false;
+
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      caughtException = true;
+    }
+
+    try {
+      wbs.endWindow();
+    } catch(Exception e) {
+      caughtException = true;
+    }
+
+    wbs.teardown();
+    Assert.assertEquals(true, caughtException);
+  }
+
+  public static class CounterRunnable implements Runnable
+  {
+    private int counter = 0;
+
+    public CounterRunnable()
+    {
+    }
+
+    @Override
+    public void run()
+    {
+      counter++;
+    }
+
+    public int getCounter()
+    {
+      return counter;
+    }
+  }
+
+  public static class ExceptionRunnable implements Runnable
+  {
+    public ExceptionRunnable()
+    {
+    }
+
+    @Override
+    public void run()
+    {
+      throw new RuntimeException("Simulate Failure");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c95f524..e9dbcd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>dt-framework</artifactId>
-    <version>3.0.0</version>
+    <version>3.1.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar</artifactId>
@@ -38,7 +38,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <netbeans.hint.license>malhar-inc</netbeans.hint.license>
     <maven.deploy.skip>false</maven.deploy.skip>
-    <dt.framework.version>3.0.0</dt.framework.version>
+    <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version>
     <!-- the following properties match the properties defined in core/pom.xml 
-->
     <jackson.version>1.9.2</jackson.version>
     <jersey.version>1.9</jersey.version>

Reply via email to