Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 54586f4ac -> 91321cef5


MLHR-1824 Coverted to use App Data

Closes #2


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/91321cef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/91321cef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/91321cef

Branch: refs/heads/devel-3
Commit: 91321cef55558b41e8177ca83173a51e02b71902
Parents: 54586f4
Author: Munagala V. Ramanath <[email protected]>
Authored: Fri Aug 28 09:16:31 2015 -0700
Committer: Vlad Rozov <[email protected]>
Committed: Mon Aug 31 14:52:27 2015 -0700

----------------------------------------------------------------------
 demos/pi/pom.xml                                |  8 ++
 .../com/datatorrent/demos/pi/Application.java   | 53 ++++++++++--
 .../datatorrent/demos/pi/NamedValueList.java    | 86 ++++++++++++++++++++
 .../src/main/resources/META-INF/properties.xml  | 12 +++
 .../pi/src/main/resources/PiDemoDataSchema.json |  3 +
 5 files changed, 157 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pi/pom.xml b/demos/pi/pom.xml
index 33ee252..8f49a2f 100644
--- a/demos/pi/pom.xml
+++ b/demos/pi/pom.xml
@@ -16,4 +16,12 @@
     <version>3.2.0-SNAPSHOT</version>
   </parent>
 
+  <dependencies>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>6.6.4</version>
+    </dependency>
+  </dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java 
b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
index b33a553..382d9c8 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
@@ -15,15 +15,23 @@
  */
 package com.datatorrent.demos.pi;
 
-import org.apache.hadoop.conf.Configuration;
+import java.net.URI;
 
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 
 /**
  * Monte Carlo PI estimation demo : <br>
@@ -75,6 +83,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 @ApplicationAnnotation(name="PiDemo")
 public class Application implements StreamingApplication
 {
+  public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
+
   private final Locality locality = null;
 
   @Override
@@ -82,9 +92,42 @@ public class Application implements StreamingApplication
   {
     RandomEventGenerator rand = dag.addOperator("rand", new 
RandomEventGenerator());
     PiCalculateOperator calc = dag.addOperator("picalc", new 
PiCalculateOperator());
-    ConsoleOutputOperator console = dag.addOperator("console", new 
ConsoleOutputOperator());
+
+
     dag.addStream("rand_calc", rand.integer_data, 
calc.input).setLocality(locality);
-    dag.addStream("rand_console",calc.output, 
console.input).setLocality(locality);
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+    if (StringUtils.isEmpty(gatewayAddress)) {
+      ConsoleOutputOperator console = dag.addOperator("console", new 
ConsoleOutputOperator());
+      dag.addStream("rand_console", calc.output, 
console.input).setLocality(locality);
+
+    } else {
+
+      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+      AppDataSnapshotServerMap snapshotServer
+        = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
+
+      String snapshotServerJSON = 
SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+
+      snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+
+      PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new 
PubSubWebSocketAppDataQuery());
+      PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", 
new PubSubWebSocketAppDataResult());
+
+      wsQuery.setUri(uri);
+      wsResult.setUri(uri);
+      Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+      Operator.InputPort<String> queryResultPort = wsResult.input;
+
+      NamedValueList<Object> adaptor = dag.addOperator("adaptor", new 
NamedValueList<Object>());
+
+      dag.addStream("PiValues", calc.output, adaptor.inPort);
+      dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
+      dag.addStream("Query", queryPort, snapshotServer.query);
+      dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
----------------------------------------------------------------------
diff --git 
a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java 
b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
new file mode 100644
index 0000000..f76f4af
--- /dev/null
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.pi;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * <p>An operator which converts a raw value to a named value singleton 
list.</p>
+ * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use 
this operator to
+ * convert individual values to a singleton list of a named value
+ * <p>
+ * @displayNamed Value
+ * @tags count
+ */
+public class NamedValueList<T> extends BaseOperator
+{
+  @NotNull
+  private String valueName;
+
+  private List<Map<String, T>> valueList;
+  private Map<String, T> valueMap;
+
+  public final transient DefaultInputPort<T> inPort = new 
DefaultInputPort<T>() {
+    @Override
+    public void process(T val) {
+      valueMap.put(valueName, val);
+      outPort.emit(valueList);
+    }
+  };
+
+  public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new 
DefaultOutputPort<>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    valueMap = new HashMap<>();
+    valueMap.put(valueName, null);
+    valueList = Collections.singletonList(valueMap);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public String getValueName() {
+    return valueName;
+  }
+
+  public void setValueName(String name) {
+    valueName = name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/resources/META-INF/properties.xml 
b/demos/pi/src/main/resources/META-INF/properties.xml
index 4a48fb4..3bbafdb 100644
--- a/demos/pi/src/main/resources/META-INF/properties.xml
+++ b/demos/pi/src/main/resources/META-INF/properties.xml
@@ -31,6 +31,18 @@
     <name>dt.application.PiDemo.operator.picalc.base</name>
     <value>900000000</value>
   </property>
+  <property>
+    <name>dt.application.PiDemo.operator.adaptor.valueName</name>
+    <value>piValue</value>
+  </property>
+  <property>
+    <name>dt.application.PiDemo.operator.Query.topic</name>
+    <value>PiDemoQuery</value>
+  </property>
+  <property>
+    <name>dt.application.PiDemo.operator.QueryResult.topic</name>
+    <value>PiDemoQueryResult</value>
+  </property>
 
   <!-- PiLibraryDemo -->
   <property>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/91321cef/demos/pi/src/main/resources/PiDemoDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/resources/PiDemoDataSchema.json 
b/demos/pi/src/main/resources/PiDemoDataSchema.json
new file mode 100644
index 0000000..47db8eb
--- /dev/null
+++ b/demos/pi/src/main/resources/PiDemoDataSchema.json
@@ -0,0 +1,3 @@
+{
+  "values": [{"name": "piValue", "type": "double"}]
+}

Reply via email to