[
https://issues.apache.org/jira/browse/QUARKS-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196414#comment-15196414
]
ASF GitHub Bot commented on QUARKS-16:
--------------------------------------
Github user ddebrunner commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/14#discussion_r56254806
--- Diff:
samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
---
@@ -0,0 +1,158 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.topology;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.ToIntFunction;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.HeartMonitorSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Applying different processing against a set of streams and combining the
+ * resulting streams into a single stream.
+ *
+ * @see HeartMonitorSensor
+ */
+public class CombiningStreamsProcessingResults {
+ /**
+ * Polls a simulated heart monitor to periodically obtain blood
pressure readings.
+ * Splits the readings by blood pressure category into separate
streams.
+ * Applies different processing on each stream to generate alert
streams.
+ * Combines the alert streams into a single stream and prints the
alerts.
+ *
+ */
+ public static void main(String[] args) throws Exception {
+ HeartMonitorSensor monitor = new HeartMonitorSensor();
+
+ DirectProvider dp = new DevelopmentProvider();
+
+
System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+ Topology top = dp.newTopology("heartMonitor");
+
+ // Generate a stream of heart monitor readings
+ TStream<Map<String, Integer>> readings = top
+ .poll(monitor, 1, TimeUnit.MILLISECONDS)
+ .filter(tuple -> tuple.get("Systolic") > 50 &&
tuple.get("Diastolic") > 30)
+ .filter(tuple -> tuple.get("Systolic") < 200 &&
tuple.get("Diastolic") < 130);
+
+ // Split the stream by blood pressure category
+ List<TStream<Map<String, Integer>>> categories = readings.split(6,
new ToIntFunction<Map<String, Integer>>() {
+ @Override
+ public int applyAsInt(Map<String, Integer> tuple) {
+ if (tuple.get("Systolic") < 120 && tuple.get("Diastolic")
< 80) {
+ // Normal
+ return 0;
+ } else if ((tuple.get("Systolic") >= 120 &&
tuple.get("Systolic") <= 139) ||
+ (tuple.get("Diastolic") >= 80 &&
tuple.get("Diastolic") <= 89)) {
+ // Prehypertension
+ return 1;
+ } else if ((tuple.get("Systolic") >= 140 &&
tuple.get("Systolic") <= 159) ||
+ (tuple.get("Diastolic") >= 90 &&
tuple.get("Diastolic") <= 99)) {
+ // High Blood Pressure (Hypertension) Stage 1
+ return 2;
+ } else if ((tuple.get("Systolic") >= 160 &&
tuple.get("Systolic") <= 179) ||
+ (tuple.get("Diastolic") >= 100 &&
tuple.get("Diastolic") <= 109)) {
+ // High Blood Pressure (Hypertension) Stage 2
+ return 3;
+ } else if (tuple.get("Systolic") >= 180 &&
tuple.get("Diastolic") >= 110) {
+ // Hypertensive Crisis
+ return 4;
+ } else {
+ // Invalid
+ return -1;
+ }
+ }
+ });
+
+ // Get each individual stream
+ TStream<Map<String, Integer>> normal =
categories.get(0).tag("normal");
+ TStream<Map<String, Integer>> prehypertension =
categories.get(1).tag("prehypertension");
+ TStream<Map<String, Integer>> hypertension_stage1 =
categories.get(2).tag("hypertension_stage1");
+ TStream<Map<String, Integer>> hypertension_stage2 =
categories.get(3).tag("hypertension_stage2");
+ TStream<Map<String, Integer>> hypertensive =
categories.get(4).tag("hypertensive");
+
+ // Perform analytics on each stream and generate alerts for each
blood pressure category
+
+ // Category: Normal
+ TStream<String> normalAlerts = normal
+ .filter(tuple -> tuple.get("Systolic") > 80 &&
tuple.get("Diastolic") > 50)
+ .tag("normal")
+ .map(tuple -> {
+ return "All is normal. BP is " + tuple.get("Systolic")
+ "/" +
+ tuple.get("Diastolic") + ".\n"; })
+ .tag("normal");
+
+ // Category: Prehypertension category
+ TStream<String> prehypertensionAlerts = prehypertension
+ .map(tuple -> {
+ return "At high risk for developing hypertension. BP
is " +
+ tuple.get("Systolic") + "/" +
tuple.get("Diastolic") + ".\n"; })
+ .tag("prehypertension");
+
+ // Category: High Blood Pressure (Hypertension) Stage 1
+ TStream<String> hypertension_stage1Alerts = hypertension_stage1
+ .map(tuple -> {
+ return "Monitor closely, patient has high blood
pressure. " +
+ "BP is " + tuple.get("Systolic") + "/" +
tuple.get("Diastolic") + ".\n"; })
+ .tag("hypertension_stage1")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage
1\n" + tuple)
+ .tag("hypertension_stage1");
+
+ // Category: High Blood Pressure (Hypertension) Stage 2
+ TStream<String> hypertension_stage2Alerts = hypertension_stage2
+ .filter(tuple -> tuple.get("Systolic") >= 170 &&
tuple.get("Diastolic") >= 105)
+ .tag("hypertension_stage2")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") +
"/" + tuple.get("Diastolic")))
+ .map(tuple -> {
+ return "Warning! Monitor closely, patient is at risk
of a hypertensive crisis!\n"; })
+ .tag("hypertension_stage2")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage
2\n" + tuple)
+ .tag("hypertension_stage2");
+
+ // Category: Hypertensive Crisis
+ TStream<String> hypertensiveAlerts = hypertensive
+ .filter(tuple -> tuple.get("Systolic") >= 180)
+ .tag("hypertensive")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") +
"/" + tuple.get("Diastolic")))
+ .map(tuple -> { return "Emergency! See to patient
immediately!\n"; })
+ .tag("hypertensive")
+ .modify(tuple -> tuple.toUpperCase())
+ .tag("hypertensive")
+ .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
+ .tag("hypertensive");
+
+ // Union two streams to obtain a single stream containing alerts
from the normal and
+ // prehypertension alert streams
+ TStream<String> normalAndPrehypertensionAlerts =
normalAlerts.union(prehypertensionAlerts);
--- End diff --
Just curious if there is some point you are trying to get across with the
multiple unions, rather than the equivalent single union?
> Recipe creation for split and union
> -----------------------------------
>
> Key: QUARKS-16
> URL: https://issues.apache.org/jira/browse/QUARKS-16
> Project: Quarks
> Issue Type: Improvement
> Components: Documentation
> Reporter: Queenie Ma
> Assignee: Queenie Ma
> Priority: Minor
> Labels: documentation, enhancement
> Attachments: CombiningStreamsProcessingResults.java,
> HeartMonitorSensor.java, split_union.md, split_union_topology_graph.png,
> split_union_topology_graph.png
>
>
> I wrote up a recipe on how to split and union streams. I'd like to get some
> feedback on the sample code and the recipe walkthrough.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)