Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 54586f4ac -> 91321cef5
MLHR-1824 Coverted to use App Data Closes #2 Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/91321cef Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/91321cef Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/91321cef Branch: refs/heads/devel-3 Commit: 91321cef55558b41e8177ca83173a51e02b71902 Parents: 54586f4 Author: Munagala V. Ramanath <[email protected]> Authored: Fri Aug 28 09:16:31 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Aug 31 14:52:27 2015 -0700 ---------------------------------------------------------------------- demos/pi/pom.xml | 8 ++ .../com/datatorrent/demos/pi/Application.java | 53 ++++++++++-- .../datatorrent/demos/pi/NamedValueList.java | 86 ++++++++++++++++++++ .../src/main/resources/META-INF/properties.xml | 12 +++ .../pi/src/main/resources/PiDemoDataSchema.json | 3 + 5 files changed, 157 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pi/pom.xml b/demos/pi/pom.xml index 33ee252..8f49a2f 100644 --- a/demos/pi/pom.xml +++ b/demos/pi/pom.xml @@ -16,4 +16,12 @@ <version>3.2.0-SNAPSHOT</version> </parent> + <dependencies> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>6.6.4</version> + </dependency> + </dependencies> + </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java index b33a553..382d9c8 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java @@ -15,15 +15,23 @@ */ package com.datatorrent.demos.pi; -import org.apache.hadoop.conf.Configuration; +import java.net.URI; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.testbench.RandomEventGenerator; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; +import com.datatorrent.lib.testbench.RandomEventGenerator; + /** * Monte Carlo PI estimation demo : <br> @@ -75,6 +83,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; @ApplicationAnnotation(name="PiDemo") public class Application implements StreamingApplication { + public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json"; + private final Locality locality = null; @Override @@ -82,9 +92,42 @@ public class Application implements StreamingApplication { RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); - dag.addStream("rand_console",calc.output, console.input).setLocality(locality); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + + if (StringUtils.isEmpty(gatewayAddress)) { + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("rand_console", calc.output, console.input).setLocality(locality); + + } else { + + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServer + = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap()); + + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); + + snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); + + PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery()); + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); + + wsQuery.setUri(uri); + wsResult.setUri(uri); + Operator.OutputPort<String> queryPort = wsQuery.outputPort; + Operator.InputPort<String> queryResultPort = wsResult.input; + + NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>()); + + dag.addStream("PiValues", calc.output, adaptor.inPort); + dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input); + dag.addStream("Query", queryPort, snapshotServer.query); + dag.addStream("Result", snapshotServer.queryResult, queryResultPort); + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java new file mode 100644 index 0000000..f76f4af --- /dev/null +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.pi; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Context.OperatorContext; + +/** + * <p>An operator which converts a raw value to a named value singleton list.</p> + * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use this operator to + * convert individual values to a singleton list of a named value + * <p> + * @displayNamed Value + * @tags count + */ +public class NamedValueList<T> extends BaseOperator +{ + @NotNull + private String valueName; + + private List<Map<String, T>> valueList; + private Map<String, T> valueMap; + + public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() { + @Override + public void process(T val) { + valueMap.put(valueName, val); + outPort.emit(valueList); + } + }; + + public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext context) + { + valueMap = new HashMap<>(); + valueMap.put(valueName, null); + valueList = Collections.singletonList(valueMap); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + public String getValueName() { + return valueName; + } + + public void setValueName(String name) { + valueName = name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/META-INF/properties.xml b/demos/pi/src/main/resources/META-INF/properties.xml index 4a48fb4..3bbafdb 100644 --- a/demos/pi/src/main/resources/META-INF/properties.xml +++ b/demos/pi/src/main/resources/META-INF/properties.xml @@ -31,6 +31,18 @@ <name>dt.application.PiDemo.operator.picalc.base</name> <value>900000000</value> </property> + <property> + <name>dt.application.PiDemo.operator.adaptor.valueName</name> + <value>piValue</value> + </property> + <property> + <name>dt.application.PiDemo.operator.Query.topic</name> + <value>PiDemoQuery</value> + </property> + <property> + <name>dt.application.PiDemo.operator.QueryResult.topic</name> + <value>PiDemoQueryResult</value> + </property> <!-- PiLibraryDemo --> <property> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/resources/PiDemoDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/PiDemoDataSchema.json b/demos/pi/src/main/resources/PiDemoDataSchema.json new file mode 100644 index 0000000..47db8eb --- /dev/null +++ b/demos/pi/src/main/resources/PiDemoDataSchema.json @@ -0,0 +1,3 @@ +{ + "values": [{"name": "piValue", "type": "double"}] +}
