[ 
https://issues.apache.org/jira/browse/QUARKS-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196425#comment-15196425
 ] 

ASF GitHub Bot commented on QUARKS-16:
--------------------------------------

Github user ddebrunner commented on a diff in the pull request:

    https://github.com/apache/incubator-quarks/pull/14#discussion_r56255525
  
    --- Diff: 
samples/topology/src/main/java/quarks/samples/topology/CombiningStreamsProcessingResults.java
 ---
    @@ -0,0 +1,158 @@
    +/*
    +# Licensed Materials - Property of IBM
    +# Copyright IBM Corp. 2015,2016
    +*/
    +
    +package quarks.samples.topology;
    +
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import quarks.console.server.HttpServer;
    +import quarks.function.ToIntFunction;
    +import quarks.providers.development.DevelopmentProvider;
    +import quarks.providers.direct.DirectProvider;
    +import quarks.samples.utils.sensor.HeartMonitorSensor;
    +import quarks.topology.TStream;
    +import quarks.topology.Topology;
    +
    +/**
    + * Applying different processing against a set of streams and combining the
    + * resulting streams into a single stream.
    + *
    + *  @see HeartMonitorSensor
    + */
    +public class CombiningStreamsProcessingResults {
    +    /**
    +     * Polls a simulated heart monitor to periodically obtain blood 
pressure readings.
    +     * Splits the readings by blood pressure category into separate 
streams.
    +     * Applies different processing on each stream to generate alert 
streams.
    +     * Combines the alert streams into a single stream and prints the 
alerts.
    +     *
    +     */
    +    public static void main(String[] args) throws Exception {
    +        HeartMonitorSensor monitor = new HeartMonitorSensor();
    +
    +        DirectProvider dp = new DevelopmentProvider();
    +
    +        
System.out.println(dp.getServices().getService(HttpServer.class).getConsoleUrl());
    +
    +        Topology top = dp.newTopology("heartMonitor");
    +
    +        // Generate a stream of heart monitor readings
    +        TStream<Map<String, Integer>> readings = top
    +                .poll(monitor, 1, TimeUnit.MILLISECONDS)
    +                .filter(tuple -> tuple.get("Systolic") > 50 && 
tuple.get("Diastolic") > 30)
    +                .filter(tuple -> tuple.get("Systolic") < 200 && 
tuple.get("Diastolic") < 130);
    +
    +        // Split the stream by blood pressure category
    +        List<TStream<Map<String, Integer>>> categories = readings.split(6, 
new ToIntFunction<Map<String, Integer>>() {
    +            @Override
    +            public int applyAsInt(Map<String, Integer> tuple) {
    +                if (tuple.get("Systolic") < 120 && tuple.get("Diastolic") 
< 80) {
    +                    // Normal
    +                    return 0;
    +                } else if ((tuple.get("Systolic") >= 120 && 
tuple.get("Systolic") <= 139) ||
    +                           (tuple.get("Diastolic") >= 80 && 
tuple.get("Diastolic") <= 89)) {
    +                    // Prehypertension
    +                    return 1;
    +                } else if ((tuple.get("Systolic") >= 140 && 
tuple.get("Systolic") <= 159) ||
    +                           (tuple.get("Diastolic") >= 90 && 
tuple.get("Diastolic") <= 99)) {
    +                    // High Blood Pressure (Hypertension) Stage 1
    +                    return 2;
    +                } else if ((tuple.get("Systolic") >= 160 && 
tuple.get("Systolic") <= 179) ||
    +                           (tuple.get("Diastolic") >= 100 && 
tuple.get("Diastolic") <= 109)) {
    +                    // High Blood Pressure (Hypertension) Stage 2
    +                    return 3;
    +                } else if (tuple.get("Systolic") >= 180 && 
tuple.get("Diastolic") >= 110)  {
    +                    // Hypertensive Crisis
    +                    return 4;
    +                } else {
    +                    // Invalid
    +                    return -1;
    +                }
    +            }
    +        });
    +
    +        // Get each individual stream
    +        TStream<Map<String, Integer>> normal = 
categories.get(0).tag("normal");
    +        TStream<Map<String, Integer>> prehypertension = 
categories.get(1).tag("prehypertension");
    +        TStream<Map<String, Integer>> hypertension_stage1 = 
categories.get(2).tag("hypertension_stage1");
    +        TStream<Map<String, Integer>> hypertension_stage2 = 
categories.get(3).tag("hypertension_stage2");
    +        TStream<Map<String, Integer>> hypertensive = 
categories.get(4).tag("hypertensive");
    +
    +        // Perform analytics on each stream and generate alerts for each 
blood pressure category
    +
    +        // Category: Normal
    +        TStream<String> normalAlerts = normal
    +                .filter(tuple -> tuple.get("Systolic") > 80 && 
tuple.get("Diastolic") > 50)
    +                .tag("normal")
    +                .map(tuple -> {
    +                    return "All is normal. BP is " + tuple.get("Systolic") 
+ "/" +
    +                            tuple.get("Diastolic") + ".\n"; })
    +                .tag("normal");
    +
    +        // Category: Prehypertension category
    +        TStream<String> prehypertensionAlerts = prehypertension
    +                .map(tuple -> {
    +                    return "At high risk for developing hypertension. BP 
is " +
    +                            tuple.get("Systolic") + "/" + 
tuple.get("Diastolic") + ".\n"; })
    +                .tag("prehypertension");
    +
    +        // Category: High Blood Pressure (Hypertension) Stage 1
    +        TStream<String> hypertension_stage1Alerts = hypertension_stage1
    +                .map(tuple -> {
    +                    return "Monitor closely, patient has high blood 
pressure. " +
    +                           "BP is " + tuple.get("Systolic") + "/" + 
tuple.get("Diastolic") + ".\n"; })
    +                .tag("hypertension_stage1")
    +                .modify(tuple -> "High Blood Pressure (Hypertension) Stage 
1\n" + tuple)
    +                .tag("hypertension_stage1");
    +
    +        // Category: High Blood Pressure (Hypertension) Stage 2
    +        TStream<String> hypertension_stage2Alerts = hypertension_stage2
    +                .filter(tuple -> tuple.get("Systolic") >= 170 && 
tuple.get("Diastolic") >= 105)
    +                .tag("hypertension_stage2")
    +                .peek(tuple ->
    +                    System.out.println("BP: " + tuple.get("Systolic") + 
"/" + tuple.get("Diastolic")))
    +                .map(tuple -> {
    +                    return "Warning! Monitor closely, patient is at risk 
of a hypertensive crisis!\n"; })
    +                .tag("hypertension_stage2")
    +                .modify(tuple -> "High Blood Pressure (Hypertension) Stage 
2\n" + tuple)
    +                .tag("hypertension_stage2");
    +
    +        // Category: Hypertensive Crisis
    +        TStream<String> hypertensiveAlerts = hypertensive
    +                .filter(tuple -> tuple.get("Systolic") >= 180)
    +                .tag("hypertensive")
    +                .peek(tuple ->
    +                    System.out.println("BP: " + tuple.get("Systolic") + 
"/" + tuple.get("Diastolic")))
    +                .map(tuple -> { return "Emergency! See to patient 
immediately!\n"; })
    +                .tag("hypertensive")
    +                .modify(tuple -> tuple.toUpperCase())
    +                .tag("hypertensive")
    +                .modify(tuple -> "Hypertensive Crisis!!!\n" + tuple)
    +                .tag("hypertensive");
    +
    +        // Union two streams to obtain a single stream containing alerts 
from the normal and
    +        // prehypertension alert streams
    +        TStream<String> normalAndPrehypertensionAlerts = 
normalAlerts.union(prehypertensionAlerts);
    --- End diff --
    
    Maybe some comment about that, otherwise a reader might be wondering why it 
was done this way. Maybe something along the lines of additional processing for 
these streams could go here.


> Recipe creation for split and union
> -----------------------------------
>
>                 Key: QUARKS-16
>                 URL: https://issues.apache.org/jira/browse/QUARKS-16
>             Project: Quarks
>          Issue Type: Improvement
>          Components: Documentation
>            Reporter: Queenie Ma
>            Assignee: Queenie Ma
>            Priority: Minor
>              Labels: documentation, enhancement
>         Attachments: CombiningStreamsProcessingResults.java, 
> HeartMonitorSensor.java, split_union.md, split_union_topology_graph.png, 
> split_union_topology_graph.png
>
>
> I wrote up a recipe on how to split and union streams. I'd like to get some 
> feedback on the sample code and the recipe walkthrough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to