Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 66a1d58b8 -> 873667bce
MLHR-1824 Add App Data support as a separate app Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/85ca2e5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/85ca2e5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/85ca2e5f Branch: refs/heads/devel-3 Commit: 85ca2e5f4a66586124719c3d5ff91f0fa57ec757 Parents: 91321ce Author: Munagala V. Ramanath <[email protected]> Authored: Fri Aug 28 09:16:31 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Tue Sep 29 12:38:37 2015 -0700 ---------------------------------------------------------------------- .../com/datatorrent/demos/pi/Application.java | 53 +------- .../demos/pi/ApplicationAppData.java | 132 +++++++++++++++++++ .../src/main/resources/META-INF/properties.xml | 22 +++- 3 files changed, 157 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java index 382d9c8..b33a553 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java @@ -15,23 +15,15 @@ */ package com.datatorrent.demos.pi; -import java.net.URI; - -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.RandomEventGenerator; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; -import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; -import com.datatorrent.lib.testbench.RandomEventGenerator; - /** * Monte Carlo PI estimation demo : <br> @@ -83,8 +75,6 @@ import com.datatorrent.lib.testbench.RandomEventGenerator; @ApplicationAnnotation(name="PiDemo") public class Application implements StreamingApplication { - public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json"; - private final Locality locality = null; @Override @@ -92,42 +82,9 @@ public class Application implements StreamingApplication { RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); - - + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); - - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - - if (StringUtils.isEmpty(gatewayAddress)) { - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); - dag.addStream("rand_console", calc.output, console.input).setLocality(locality); - - } else { - - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - - AppDataSnapshotServerMap snapshotServer - = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap()); - - String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); - - snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); - - PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery()); - PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); - - wsQuery.setUri(uri); - wsResult.setUri(uri); - Operator.OutputPort<String> queryPort = wsQuery.outputPort; - Operator.InputPort<String> queryResultPort = wsResult.input; - - NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>()); - - dag.addStream("PiValues", calc.output, adaptor.inPort); - dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input); - dag.addStream("Query", queryPort, snapshotServer.query); - dag.addStream("Result", snapshotServer.queryResult, queryResultPort); - } + dag.addStream("rand_console",calc.output, console.input).setLocality(locality); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java new file mode 100644 index 0000000..ffe4971 --- /dev/null +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.pi; + +import java.net.URI; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * Monte Carlo PI estimation demo : <br> + * This application computes value of PI using Monte Carlo pi estimation + * formula. + * <p> + * Very similar to PiDemo but data is also written to an App Data operator for visualization. + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see something like the + * following output on the console (since the input sequence of random numbers + * can vary from one run to the next, there will be some variation in the + * output values): + * + * <pre> + * 3.1430480549199085 + * 3.1423454157782515 + * 3.1431377245508982 + * 3.142078799249531 + * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished. + * </pre> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 1000 ms(1 Sec) <br> + * Operator Details : <br> + * <ul> + * <li><b>The rand Operator : </b> This operator generates random integer + * between 0-30k. <br> + * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br> + * StateFull : No</li> + * <li><b>The calc operator : </b> This operator computes value of pi using + * monte carlo estimation. <br> + * Class : com.datatorrent.demos.pi.PiCalculateOperator <br> + * StateFull : No</li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). You can use other output adapters if needed.<br> + * </li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name="PiDemoAppData") +public class ApplicationAppData implements StreamingApplication +{ + public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json"; + + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); + + + dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + + if (StringUtils.isEmpty(gatewayAddress)) { + throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); + } + + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServer + = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap()); + + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); + + snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); + + PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery()); + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); + + wsQuery.setUri(uri); + wsResult.setUri(uri); + Operator.OutputPort<String> queryPort = wsQuery.outputPort; + Operator.InputPort<String> queryResultPort = wsResult.input; + + NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);; + dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input); + dag.addStream("Query", queryPort, snapshotServer.query); + dag.addStream("Result", snapshotServer.queryResult, queryResultPort); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/META-INF/properties.xml b/demos/pi/src/main/resources/META-INF/properties.xml index 3bbafdb..c70d0cb 100644 --- a/demos/pi/src/main/resources/META-INF/properties.xml +++ b/demos/pi/src/main/resources/META-INF/properties.xml @@ -35,12 +35,30 @@ <name>dt.application.PiDemo.operator.adaptor.valueName</name> <value>piValue</value> </property> + + <!-- PiDemoAppData --> + <property> + <name>dt.application.PiDemoAppData.operator.rand.minvalue</name> + <value>0</value> + </property> + <property> + <name>dt.application.PiDemoAppData.operator.rand.maxvalue</name> + <value>30000</value> + </property> + <property> + <name>dt.application.PiDemoAppData.operator.picalc.base</name> + <value>900000000</value> + </property> + <property> + <name>dt.application.PiDemoAppData.operator.adaptor.valueName</name> + <value>piValue</value> + </property> <property> - <name>dt.application.PiDemo.operator.Query.topic</name> + <name>dt.application.PiDemoAppData.operator.Query.topic</name> <value>PiDemoQuery</value> </property> <property> - <name>dt.application.PiDemo.operator.QueryResult.topic</name> + <name>dt.application.PiDemoAppData.operator.QueryResult.topic</name> <value>PiDemoQueryResult</value> </property>
