Github user ddebrunner commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/14#discussion_r56254806
--- Diff:
samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
---
@@ -0,0 +1,158 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2015,2016
+*/
+
+package quarks.samples.topology;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.ToIntFunction;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.HeartMonitorSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Applying different processing against a set of streams and combining the
+ * resulting streams into a single stream.
+ *
+ * @see HeartMonitorSensor
+ */
+public class CombiningStreamsProcessingResults {
+ /**
+ * Polls a simulated heart monitor to periodically obtain blood
pressure readings.
+ * Splits the readings by blood pressure category into separate
streams.
+ * Applies different processing on each stream to generate alert
streams.
+ * Combines the alert streams into a single stream and prints the
alerts.
+ *
+ */
+ public static void main(String[] args) throws Exception {
+ HeartMonitorSensor monitor = new HeartMonitorSensor();
+
+ DirectProvider dp = new DevelopmentProvider();
+
+
System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+ Topology top = dp.newTopology("heartMonitor");
+
+ // Generate a stream of heart monitor readings
+ TStream<Map<String, Integer>> readings = top
+ .poll(monitor, 1, TimeUnit.MILLISECONDS)
+ .filter(tuple -> tuple.get("Systolic") > 50 &&
tuple.get("Diastolic") > 30)
+ .filter(tuple -> tuple.get("Systolic") < 200 &&
tuple.get("Diastolic") < 130);
+
+ // Split the stream by blood pressure category
+ List<TStream<Map<String, Integer>>> categories = readings.split(6,
new ToIntFunction<Map<String, Integer>>() {
+ @Override
+ public int applyAsInt(Map<String, Integer> tuple) {
+ if (tuple.get("Systolic") < 120 && tuple.get("Diastolic")
< 80) {
+ // Normal
+ return 0;
+ } else if ((tuple.get("Systolic") >= 120 &&
tuple.get("Systolic") <= 139) ||
+ (tuple.get("Diastolic") >= 80 &&
tuple.get("Diastolic") <= 89)) {
+ // Prehypertension
+ return 1;
+ } else if ((tuple.get("Systolic") >= 140 &&
tuple.get("Systolic") <= 159) ||
+ (tuple.get("Diastolic") >= 90 &&
tuple.get("Diastolic") <= 99)) {
+ // High Blood Pressure (Hypertension) Stage 1
+ return 2;
+ } else if ((tuple.get("Systolic") >= 160 &&
tuple.get("Systolic") <= 179) ||
+ (tuple.get("Diastolic") >= 100 &&
tuple.get("Diastolic") <= 109)) {
+ // High Blood Pressure (Hypertension) Stage 2
+ return 3;
+ } else if (tuple.get("Systolic") >= 180 &&
tuple.get("Diastolic") >= 110) {
+ // Hypertensive Crisis
+ return 4;
+ } else {
+ // Invalid
+ return -1;
+ }
+ }
+ });
+
+ // Get each individual stream
+ TStream<Map<String, Integer>> normal =
categories.get(0).tag("normal");
+ TStream<Map<String, Integer>> prehypertension =
categories.get(1).tag("prehypertension");
+ TStream<Map<String, Integer>> hypertension_stage1 =
categories.get(2).tag("hypertension_stage1");
+ TStream<Map<String, Integer>> hypertension_stage2 =
categories.get(3).tag("hypertension_stage2");
+ TStream<Map<String, Integer>> hypertensive =
categories.get(4).tag("hypertensive");
+
+ // Perform analytics on each stream and generate alerts for each
blood pressure category
+
+ // Category: Normal
+ TStream<String> normalAlerts = normal
+ .filter(tuple -> tuple.get("Systolic") > 80 &&
tuple.get("Diastolic") > 50)
+ .tag("normal")
+ .map(tuple -> {
+ return "All is normal. BP is " + tuple.get("Systolic")
+ "/" +
+ tuple.get("Diastolic") + ".\n"; })
+ .tag("normal");
+
+ // Category: Prehypertension category
+ TStream<String> prehypertensionAlerts = prehypertension
+ .map(tuple -> {
+ return "At high risk for developing hypertension. BP
is " +
+ tuple.get("Systolic") + "/" +
tuple.get("Diastolic") + ".\n"; })
+ .tag("prehypertension");
+
+ // Category: High Blood Pressure (Hypertension) Stage 1
+ TStream<String> hypertension_stage1Alerts = hypertension_stage1
+ .map(tuple -> {
+ return "Monitor closely, patient has high blood
pressure. " +
+ "BP is " + tuple.get("Systolic") + "/" +
tuple.get("Diastolic") + ".\n"; })
+ .tag("hypertension_stage1")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage
1\n" + tuple)
+ .tag("hypertension_stage1");
+
+ // Category: High Blood Pressure (Hypertension) Stage 2
+ TStream<String> hypertension_stage2Alerts = hypertension_stage2
+ .filter(tuple -> tuple.get("Systolic") >= 170 &&
tuple.get("Diastolic") >= 105)
+ .tag("hypertension_stage2")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") +
"/" + tuple.get("Diastolic")))
+ .map(tuple -> {
+ return "Warning! Monitor closely, patient is at risk
of a hypertensive crisis!\n"; })
+ .tag("hypertension_stage2")
+ .modify(tuple -> "High Blood Pressure (Hypertension) Stage
2\n" + tuple)
+ .tag("hypertension_stage2");
+
+ // Category: Hypertensive Crisis
+ TStream<String> hypertensiveAlerts = hypertensive
+ .filter(tuple -> tuple.get("Systolic") >= 180)
+ .tag("hypertensive")
+ .peek(tuple ->
+ System.out.println("BP: " + tuple.get("Systolic") +
"/" + tuple.get("Diastolic")))
+ .map(tuple -> { return "Emergency! See to patient
immediately!\n"; })
+ .tag("hypertensive")
+ .modify(tuple -> tuple.toUpperCase())
+ .tag("hypertensive")
+ .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
+ .tag("hypertensive");
+
+ // Union two streams to obtain a single stream containing alerts
from the normal and
+ // prehypertension alert streams
+ TStream<String> normalAndPrehypertensionAlerts =
normalAlerts.union(prehypertensionAlerts);
--- End diff --
Just curious if there is some point you are trying to get across with the
multiple unions, rather than the equivalent single union?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---