MLHR-1803 Made the twitter demo us an embeddable query operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/caf85bbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/caf85bbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/caf85bbd Branch: refs/heads/feature-AppData Commit: caf85bbdeb29a462865c60c63198f3ca06613bbe Parents: fcc380c Author: Timothy Farkas <[email protected]> Authored: Tue Aug 11 15:23:45 2015 -0700 Committer: David Yan <[email protected]> Committed: Fri Aug 28 18:49:43 2015 -0700 ---------------------------------------------------------------------- demos/pom.xml | 2 +- .../twitter/TwitterTopCounterApplication.java | 8 +- .../src/main/resources/META-INF/properties.xml | 4 + .../com/datatorrent/lib/appdata/StoreUtils.java | 84 +++++++++++ .../lib/appdata/query/WindowBoundedService.java | 147 +++++++++++++++++++ .../snapshot/AbstractAppDataSnapshotServer.java | 69 +++++++-- .../lib/io/PubSubWebSocketAppDataQuery.java | 82 ++++++++++- .../query/QueryManagerAsynchronousTest.java | 1 - .../appdata/query/WindowBoundedServiceTest.java | 124 ++++++++++++++++ pom.xml | 4 +- 10 files changed, 507 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index 2e93e02..7976aa0 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -28,7 +28,7 @@ </modules> <properties> - <datatorrent.version>3.0.0</datatorrent.version> + <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath> <semver.plugin.skip>true</semver.plugin.skip> </properties> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java index db59bdf..508b8a1 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java @@ -195,7 +195,7 @@ public class TwitterTopCounterApplication implements StreamingApplication if (!StringUtils.isEmpty(gatewayAddress)) { URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - AppDataSnapshotServerMap snapshotServer = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap()); + AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); Map<String, String> conversionMap = Maps.newHashMap(); conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE); @@ -204,15 +204,15 @@ public class TwitterTopCounterApplication implements StreamingApplication snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); snapshotServer.setTableFieldToMapField(conversionMap); - PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery()); + PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); wsQuery.setUri(uri); - Operator.OutputPort<String> queryPort = wsQuery.outputPort; + snapshotServer.setEmbeddableQueryInfoProvider(wsQuery); + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); wsResult.setUri(uri); Operator.InputPort<String> queryResultPort = wsResult.input; dag.addStream("MapProvider", topCount, snapshotServer.input); - dag.addStream("Query", queryPort, snapshotServer.query); dag.addStream("Result", snapshotServer.queryResult, queryResultPort); } else { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/demos/twitter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml b/demos/twitter/src/main/resources/META-INF/properties.xml index 53b7fb9..e2547cc 100644 --- a/demos/twitter/src/main/resources/META-INF/properties.xml +++ b/demos/twitter/src/main/resources/META-INF/properties.xml @@ -90,6 +90,10 @@ <value>TwitterHashtagQueryDemo</value> </property> <property> + <name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> + <value>TwitterHashtagQueryDemo</value> + </property> + <property> <name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name> <value>TwitterHashtagQueryResultDemo</value> </property> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java new file mode 100644 index 0000000..11efc02 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2015 DataTorrent + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.lib.appdata; + +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.io.SimpleSinglePortInputOperator.BufferingOutputPort; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Sink; + +public class StoreUtils +{ + /** + * This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port + * of the encapsulating {@link AppData.Store}. + * @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the + * {@link AppData.Store}'s input port. + * @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}. + * @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}. + */ + public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort) + { + outputPort.setSink( + new Sink<Object>() + { + @Override + @SuppressWarnings("unchecked") + public void put(Object tuple) + { + LOG.debug("processing tuple"); + inputPort.process((T)tuple); + } + + @Override + public int getCount(boolean reset) + { + return 0; + } + + } + ); + } + + /** + * This is a utility class which is responsible for flushing {@link BufferingOutputPort}s. + * @param <TUPLE_TYPE> The type of the tuple emitted by the {@link BufferingOutputPort}. + */ + public static class BufferingOutputPortFlusher<TUPLE_TYPE> implements Runnable + { + private final BufferingOutputPort<TUPLE_TYPE> port; + + public BufferingOutputPortFlusher(BufferingOutputPort<TUPLE_TYPE> port) + { + this.port = Preconditions.checkNotNull(port); + } + + @Override + public void run() + { + port.flush(Integer.MAX_VALUE); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java new file mode 100644 index 0000000..7b82f4f --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2015 DataTorrent + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.lib.appdata.query; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + + +import com.google.common.base.Preconditions; + +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context.OperatorContext; + +import com.datatorrent.common.util.NameableThreadFactory; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This class asynchronously executes a function so that the function is only called between calls + * to {@link Operator#beginWindow} and {@link Operator#endWindow}.<br/><br/> + * This service works by asynchronously calling its {@link #execute} method only after + * {@link #beginWindow} and called and before {@link #endWindow} ends. Calls to {@link #beginWindow} + * and {@link endWindow} will happen in the enclosing {@link Operator}'s main thread. + * <br/><br/> + * <b>Note:</b> This service cannot be used in operators which allow checkpointing within an + * application window. + */ +public class WindowBoundedService implements Component<OperatorContext> +{ + public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10; + + /** + * The execute interval period in milliseconds. + */ + private final long executeIntervalMillis; + /** + * The code to execute asynchronously. + */ + private final Runnable runnable; + protected transient ExecutorService executorThread; + + private final transient Semaphore mutex = new Semaphore(0); + + public WindowBoundedService(Runnable runnable) + { + this.executeIntervalMillis = DEFAULT_FLUSH_INTERVAL_MILLIS; + this.runnable = Preconditions.checkNotNull(runnable); + } + + public WindowBoundedService(long executeIntervalMillis, + Runnable runnable) + { + Preconditions.checkArgument(executeIntervalMillis > 0, + "The executeIntervalMillis must be positive"); + this.executeIntervalMillis = executeIntervalMillis; + this.runnable = Preconditions.checkNotNull(runnable); + } + + @Override + public void setup(OperatorContext context) + { + executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread")); + executorThread.submit(new AsynchExecutorThread(Thread.currentThread())); + } + + public void beginWindow(long windowId) + { + mutex.release(); + } + + public void endWindow() + { + try { + mutex.acquire(); + } catch (InterruptedException ex) { + DTThrowable.wrapIfChecked(ex); + } + } + + @Override + public void teardown() + { + executorThread.shutdownNow(); + } + + public class AsynchExecutorThread implements Callable<Void> + { + private final Thread mainThread; + private long lastExecuteTime = 0; + + public AsynchExecutorThread(Thread mainThread) + { + this.mainThread = mainThread; + } + + @Override + @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"}) + public Void call() throws Exception + { + try { + loop(); + } catch (Exception e) { + LOG.error("Exception thrown while processing:", e); + mutex.release(); + mainThread.interrupt(); + } + + return null; + } + + @SuppressWarnings("SleepWhileInLoop") + private void loop() throws Exception + { + while (true) { + long currentTime = System.currentTimeMillis(); + long diff = currentTime - lastExecuteTime; + if (diff > executeIntervalMillis) { + lastExecuteTime = currentTime; + mutex.acquireUninterruptibly(); + runnable.run(); + mutex.release(); + } else { + Thread.sleep(executeIntervalMillis - diff); + } + } + } + } + + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(WindowBoundedService.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index d0241d2..a51908f 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -21,6 +21,7 @@ import java.util.List; import javax.validation.constraints.NotNull; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.mutable.MutableLong; +import com.datatorrent.lib.appdata.StoreUtils; import com.datatorrent.lib.appdata.gpo.GPOMutable; import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager; import com.datatorrent.lib.appdata.query.QueryExecutor; @@ -43,8 +45,10 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.common.experimental.AppData; +import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider; /** * This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data @@ -54,7 +58,7 @@ import com.datatorrent.common.experimental.AppData; * @param <INPUT_EVENT> The type of the input events that the operator accepts. * @since 3.0.0 */ -public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator +public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator, AppData.Store<String> { /** * The {@link QueryManagerSynchronous} for the operator. @@ -84,11 +88,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper * The current data to be served by the operator. */ private List<GPOMutable> currentData = Lists.newArrayList(); + private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider; @AppData.ResultPort public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>(); @AppData.QueryPort + @InputPortFieldAnnotation(optional=true) public transient final DefaultInputPort<String> query = new DefaultInputPort<String>() { @Override @@ -99,24 +105,22 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper try { query = queryDeserializerFactory.deserialize(queryJSON); - } - catch(IOException ex) { + } catch (IOException ex) { LOG.error("Error parsing query: {}", queryJSON); LOG.error("{}", ex); return; } - if(query instanceof SchemaQuery) { - SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery) query); + if (query instanceof SchemaQuery) { + SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query); - if(schemaResult != null) { + if (schemaResult != null) { String schemaResultJSON = resultSerializerFactory.serialize(schemaResult); LOG.debug("emitting {}", schemaResultJSON); queryResult.emit(schemaResultJSON); } - } - else if(query instanceof DataQuerySnapshot) { - queryProcessor.enqueue((DataQuerySnapshot) query, null, null); + } else if (query instanceof DataQuerySnapshot) { + queryProcessor.enqueue((DataQuerySnapshot)query, null, null); } } }; @@ -150,6 +154,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper */ public abstract GPOMutable convert(INPUT_EVENT inputEvent); + + @Override + final public void activate(OperatorContext ctx) + { + embeddableQueryInfoProvider.activate(ctx); + } + @SuppressWarnings("unchecked") @Override public void setup(OperatorContext context) @@ -164,17 +175,33 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry); resultSerializerFactory = new MessageSerializerFactory(resultFormatter); queryProcessor.setup(context); + + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.enableEmbeddedMode(); + LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName()); + StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), + query); + embeddableQueryInfoProvider.setup(context); + } } @Override public void beginWindow(long windowId) { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.beginWindow(windowId); + } + queryProcessor.beginWindow(windowId); } @Override public void endWindow() { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.endWindow(); + } + { Result result = null; @@ -191,9 +218,21 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void teardown() { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.teardown(); + } + queryProcessor.teardown(); } + @Override + public void deactivate() + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.deactivate(); + } + } + /** * Gets the JSON for the schema. * @return the JSON for the schema. @@ -230,6 +269,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper this.resultFormatter = resultFormatter; } + @Override + public EmbeddableQueryInfoProvider<String> getEmbeddableQueryInfoProvider() + { + return embeddableQueryInfoProvider; + } + + @Override + public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider) + { + this.embeddableQueryInfoProvider = Preconditions.checkNotNull(embeddableQueryInfoProvider); + } + /** * The {@link QueryExecutor} which returns the results for queries. */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java index 14a2d2b..031befd 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java @@ -19,16 +19,23 @@ package com.datatorrent.lib.io; import java.net.URI; import java.net.URISyntaxException; +import javax.validation.constraints.Min; + import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.appdata.StoreUtils.BufferingOutputPortFlusher; +import com.datatorrent.lib.appdata.query.WindowBoundedService; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.experimental.AppData; +import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider; import com.datatorrent.common.util.PubSubMessage; /** @@ -40,11 +47,18 @@ import com.datatorrent.common.util.PubSubMessage; * @tags input, app data, query * @since 3.0.0 */ -public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider +public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider, EmbeddableQueryInfoProvider<String> { private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class); private static final long serialVersionUID = 201506121124L; + public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10; + + private boolean useEmitThread; + @Min(0) + private long executeIntervalMillis = DEFAULT_EXECUTE_INTERVAL_MILLIS; + + private transient WindowBoundedService windowBoundedService; public PubSubWebSocketAppDataQuery() { @@ -57,6 +71,42 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St this.uri = uriHelper(context, uri); logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic()); super.setup(context); + + if (useEmitThread) { + windowBoundedService = new WindowBoundedService(executeIntervalMillis, + new BufferingOutputPortFlusher<>(this.outputPort)); + windowBoundedService.setup(context); + } + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + + if (windowBoundedService != null) { + windowBoundedService.beginWindow(windowId); + } + } + + @Override + public void endWindow() + { + if (windowBoundedService != null) { + windowBoundedService.endWindow(); + } + + super.endWindow(); + } + + @Override + public void teardown() + { + if (windowBoundedService != null) { + windowBoundedService.teardown(); + } + + super.teardown(); } public static URI uriHelper(OperatorContext context, URI uri) @@ -139,4 +189,34 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St { return "pubsub"; } + + @Override + public DefaultOutputPort<String> getOutputPort() + { + return outputPort; + } + + @Override + public void enableEmbeddedMode() + { + useEmitThread = true; + } + + /** + * Get the number of milliseconds between calls to execute. + * @return The number of milliseconds between calls to execute. + */ + public long getExecuteIntervalMillis() + { + return executeIntervalMillis; + } + + /** + * The number of milliseconds between calls to execute. + * @param executeIntervalMillis The number of milliseconds between calls to execute. + */ + public void setExecuteIntervalMillis(long executeIntervalMillis) + { + this.executeIntervalMillis = executeIntervalMillis; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java index ef5f0f5..277bb3a 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java @@ -54,7 +54,6 @@ public class QueryManagerAsynchronousTest Thread.sleep(200); } catch(InterruptedException ex) { - throw new RuntimeException(ex); } Thread.interrupted(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java new file mode 100644 index 0000000..3674ce5 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2015 DataTorrent + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.lib.appdata.query; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; + +import com.datatorrent.lib.appdata.query.QueryManagerAsynchronousTest.InterruptClear; + +public class WindowBoundedServiceTest +{ + @Rule + public TestWatcher testMeta = new InterruptClear(); + + @Test + public void simpleLoopTest() throws Exception + { + CounterRunnable counterRunnable = new CounterRunnable(); + + WindowBoundedService wbs = new WindowBoundedService(1, + counterRunnable); + wbs.setup(null); + Thread.sleep(500); + Assert.assertEquals(0, counterRunnable.getCounter()); + wbs.beginWindow(0); + Thread.sleep(500); + wbs.endWindow(); + int currentCount = counterRunnable.getCounter(); + Thread.sleep(500); + wbs.teardown(); + Assert.assertEquals(currentCount, counterRunnable.getCounter()); + } + + @Test + public void runTest() throws Exception + { + CounterRunnable counterRunnable = new CounterRunnable(); + + WindowBoundedService wbs = new WindowBoundedService(1, + counterRunnable); + wbs.setup(null); + wbs.beginWindow(0); + Thread.sleep(500); + wbs.endWindow(); + wbs.teardown(); + Assert.assertTrue(counterRunnable.getCounter() > 0); + } + + @Test + public void exceptionTest() throws Exception + { + WindowBoundedService wbs = new WindowBoundedService(1, + new ExceptionRunnable()); + + wbs.setup(null); + wbs.beginWindow(0); + + boolean caughtException = false; + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + caughtException = true; + } + + try { + wbs.endWindow(); + } catch(Exception e) { + caughtException = true; + } + + wbs.teardown(); + Assert.assertEquals(true, caughtException); + } + + public static class CounterRunnable implements Runnable + { + private int counter = 0; + + public CounterRunnable() + { + } + + @Override + public void run() + { + counter++; + } + + public int getCounter() + { + return counter; + } + } + + public static class ExceptionRunnable implements Runnable + { + public ExceptionRunnable() + { + } + + @Override + public void run() + { + throw new RuntimeException("Simulate Failure"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/caf85bbd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c95f524..e9dbcd6 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>com.datatorrent</groupId> <artifactId>dt-framework</artifactId> - <version>3.0.0</version> + <version>3.1.0-SNAPSHOT</version> </parent> <artifactId>malhar</artifactId> @@ -38,7 +38,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <netbeans.hint.license>malhar-inc</netbeans.hint.license> <maven.deploy.skip>false</maven.deploy.skip> - <dt.framework.version>3.0.0</dt.framework.version> + <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version> <!-- the following properties match the properties defined in core/pom.xml --> <jackson.version>1.9.2</jackson.version> <jersey.version>1.9</jersey.version>
