Github user medcv commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6136#discussion_r193934789
  
    --- Diff: 
flink-examples/flink-examples-cep/src/main/scala/org/apache/flink/cep/examples/scala/monitoring/TemperatureMonitoring.scala
 ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.examples.scala.monitoring
    +
    +import 
org.apache.flink.cep.examples.scala.monitoring.events.{MonitoringEvent, 
TemperatureAlert, TemperatureEvent, TemperatureWarning}
    +import 
org.apache.flink.cep.examples.scala.monitoring.sources.MonitoringEventSource
    +import org.apache.flink.cep.scala.CEP
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
    +import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment, createTypeInformation}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +/**
    +  * CEP example monitoring program.
    +  * This example program generates a stream of monitoring events which are 
analyzed using
    +  * Flink's CEP library. The input event stream consists of temperature 
and power events
    +  * from a set of racks. The goal is to detect when a rack is about to 
overheat.
    +  * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
    +  * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
    +  * are higher than a given threshold value. A warning itself is not 
critical but if we see
    +  * two warning for the same rack whose temperatures are rising, we want 
to generate an alert.
    +  * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
    +  * temperature warnings.
    +  */
    +object TemperatureMonitoring {
    +
    +  private val TEMPERATURE_THRESHOLD = 100
    +
    +  def main(args: Array[String]) {
    +    println("Executing temperature monitoring Scala example.")
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +    // Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +
    +    // Input stream of monitoring events
    +    val inputEventStream = env.addSource(new MonitoringEventSource())
    +      .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor[MonitoringEvent])
    +
    +    // Warning pattern: Two consecutive temperature events whose 
temperature is higher
    +    // than the given threshold appearing within a time interval of 10 
seconds
    +    val warningPattern = Pattern
    +      .begin[MonitoringEvent]("first")
    +        .subtype(classOf[TemperatureEvent])
    +        .where(_.temperature > TEMPERATURE_THRESHOLD)
    +      .next("second")
    +        .subtype(classOf[TemperatureEvent])
    +        .where(_.temperature > TEMPERATURE_THRESHOLD)
    +      .within(Time.seconds(10))
    +
    +    // Create a pattern stream from our warning pattern
    +    val tempPatternStream = CEP.pattern(inputEventStream.keyBy(_.rackID), 
warningPattern)
    +
    +    // Generate temperature warnings for each matched warning pattern
    +    val warnings: DataStream[TemperatureWarning] = 
tempPatternStream.select( pattern => {
    +        val first = pattern("first").head.asInstanceOf[TemperatureEvent]
    +        val second = pattern("second").head.asInstanceOf[TemperatureEvent]
    +        new TemperatureWarning(first.rackID, (first.temperature + 
second.temperature) / 2)
    +      }
    +    )
    +
    +    // Alert pattern: Two consecutive temperature warnings
    +    // appearing within a time interval of 20 seconds
    +    val alertPattern = Pattern
    +      .begin[TemperatureWarning]("first")
    +      .next("second")
    +      .within(Time.seconds(20))
    +
    +    // Create a pattern stream from our alert pattern
    +    val alertPatternStream = CEP.pattern(warnings.keyBy(_.rackID), 
alertPattern)
    +
    +    // Generate a temperature alert iff the second temperature warning's 
average temperature
    --- End diff --
    
    type `iff`


---

Reply via email to