[FLINK-8548] [examples] Add state machine example

This adds an example of using a state machine for pattern validation.
The example illustrates the use of state and the kafka connector.

This closes #5401


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80883fee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80883fee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80883fee

Branch: refs/heads/master
Commit: 80883feeeba5232ecd41fe6759c69bf2ff1a6483
Parents: 85bfc07
Author: Stephan Ewen <[email protected]>
Authored: Thu Feb 1 19:51:59 2018 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Sun Feb 18 22:24:34 2018 +0100

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml |  36 ++-
 .../streaming/examples/statemachine/README.md   |  52 ++++
 .../statemachine/StateMachineExample.java       | 161 ++++++++++++
 .../statemachine/dfa/EventTypeAndState.java     |  36 +++
 .../examples/statemachine/dfa/State.java        | 160 ++++++++++++
 .../examples/statemachine/dfa/Transition.java   | 101 ++++++++
 .../examples/statemachine/event/Alert.java      |  91 +++++++
 .../examples/statemachine/event/Event.java      | 104 ++++++++
 .../examples/statemachine/event/EventType.java  |  27 ++
 .../statemachine/generator/EventsGenerator.java | 164 ++++++++++++
 .../generator/EventsGeneratorSource.java        |  68 +++++
 .../generator/StandaloneThreadedGenerator.java  | 253 +++++++++++++++++++
 .../statemachine/kafka/EventDeSerializer.java   |  63 +++++
 .../kafka/KafkaStandaloneGenerator.java         |  97 +++++++
 14 files changed, 1412 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index e3d51a9..c9367b7 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -358,7 +358,6 @@ under the License.
                                                        </includes>
                                                </configuration>
                                        </execution>
-
                                </executions>
                        </plugin>
 
@@ -487,11 +486,45 @@ under the License.
                                </executions>
                        </plugin>
 
+
                        <!-- Use the shade plugin to build a fat jar for the 
kafka example -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-shade-plugin</artifactId>
                                <executions>
+
+                                       <!-- State Machine Example -->
+                                       <execution>
+                                               <id>state-machine-example</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.examples.statemachine.StateMachineExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       
<finalName>StateMachineExample</finalName>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*</artifact>
+                                                                       
<includes>
+                                                                               
<include>org/apache/flink/streaming/examples/statemachine/**</include>
+                                                                               
<include>org/apache/flink/streaming/util/serialization/**</include>
+                                                                               
<include>org/apache/flink/streaming/connectors/kafka/**</include>
+                                                                               
<include>org/apache/kafka/**</include>
+                                                                               
<include>kafka/</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+
                                        <execution>
                                                
<id>fat-jar-kafka-010-example</id>
                                                <phase>package</phase>
@@ -528,6 +561,7 @@ under the License.
                                                        </filters>
                                                </configuration>
                                        </execution>
+
                                        <execution>
                                                <id>fat-jar-twitter-example</id>
                                                <phase>package</phase>

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md
new file mode 100644
index 0000000..6b334a3
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md
@@ -0,0 +1,52 @@
+Example: Running a state machine for pattern detection
+======================================================
+
+This example illustrates a minimal roll-your-own event pattern detection 
scenario,
+using a simple state machine that is evaluated over the stream.
+
+While this example is much simpler and more manual than what the CEP library 
supports,
+it illustrates the use of event processing and state management for a medium
+complex scenario.
+
+**Scenario Description**
+
+Events in streams are expected to occur in certain patterns. Any deviation from
+these patterns indicates an anomaly that the streaming system should recognize 
and that
+should trigger an alert.
+
+You can, for example, think of events as being generated by network devices 
and services,
+such as firewalls login-, and registration with an authentication service, 
etc. A deviation
+from expected the expected pattern might indicate an intrusion detection.
+
+The event patterns are tracked per interacting party (here simplified per 
source IP address)
+and are validated by a state machine. The state machine's states define what 
possible
+events may occur next, and what new states these events will result in.
+
+The following diagram depicts the state machine used in this example.
+
+```
+           +--<a>--> W --<b>--> Y --<e>---+
+           |                    ^         |
+   INITIAL-+                    |         |
+           |                    |         +--> (Z) -----<g>---> TERM
+           +--<c>--> X --<b>----+         |
+                     |                    |
+                     +--------<d>---------+
+```
+
+**Example Program**
+
+The main class of this example program is 
`org.apache.flink.streaming.examples.statemachine.StateMachineExample`.
+The core logic is in the `flatMap` function, which runs the state machines per 
IP address.
+
+The streaming data flow is as shown below, where the source stream may come 
from either
+an embedded data generator, or from a from a Kafka topic:
+
+```
+ [ stream partition 1] --> source --> partition -+---> flatMap(state machine) 
--> sink
+                                            \/
+                                            /\
+ [ stream partition 2] --> source --> partition -+---> flatMap(state machine) 
--> sink
+```
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
new file mode 100644
index 0000000..052e954
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.examples.statemachine.dfa.State;
+import org.apache.flink.streaming.examples.statemachine.event.Alert;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+import 
org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
+import 
org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.util.Properties;
+
+/**
+ * Main class of the state machine example.
+ * This class implements the streaming application that receives the stream of 
events and evaluates
+ * a state machine (per originating address) to validate that the events follow
+ * the state machine's rules.
+ */
+public class StateMachineExample {
+
+       /**
+        * Main entry point for the program.
+        *
+        * @param args The command line arguments.
+        */
+       public static void main(String[] args) throws Exception {
+
+               // ---- print some usage help ----
+
+               System.out.println("Usage with built-in data generator: 
StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep 
<sleep-per-record-in-ms>]");
+               System.out.println("Usage with Kafka: StateMachineExample 
--kafka-topic <topic> [--brokers <brokers>]");
+               System.out.println();
+
+               // ---- determine whether to use the built-in source, or read 
from Kafka ----
+
+               final SourceFunction<Event> source;
+               final ParameterTool params = ParameterTool.fromArgs(args);
+
+               if (params.has("kafka-topic")) {
+                       // set up the Kafka reader
+                       String kafkaTopic = params.get("kafka-topic");
+                       String brokers = params.get("brokers", 
"localhost:9092");
+
+                       System.out.printf("Reading from kafka topic %s @ %s\n", 
kafkaTopic, brokers);
+                       System.out.println();
+
+                       Properties kafkaProps = new Properties();
+                       kafkaProps.setProperty("bootstrap.servers", brokers);
+
+                       FlinkKafkaConsumer010<Event> kafka = new 
FlinkKafkaConsumer010<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
+                       kafka.setStartFromLatest();
+                       kafka.setCommitOffsetsOnCheckpoints(false);
+                       source = kafka;
+               }
+               else {
+                       double errorRate = params.getDouble("error-rate", 0.0);
+                       int sleep = params.getInt("sleep", 1);
+
+                       System.out.printf("Using standalone source with error 
rate %f and sleep delay %s millis\n", errorRate, sleep);
+                       System.out.println();
+
+                       source = new EventsGeneratorSource(errorRate, sleep);
+               }
+
+               // ---- main program ----
+
+               // create the environment to create streams and configure 
execution
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(5000);
+
+               DataStream<Event> events = env.addSource(source);
+
+               DataStream<Alert> alerts = events
+                               // partition on the address to make sure equal 
addresses
+                               // end up in the same state machine flatMap 
function
+                               .keyBy(Event::sourceAddress)
+
+                               // the function that evaluates the state 
machine over the sequence of events
+                               .flatMap(new StateMachineMapper());
+
+               // output the alerts to std-out
+               alerts.print();
+
+               // trigger program execution
+               env.execute();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The function that maintains the per-IP-address state machines and 
verifies that the
+        * events are consistent with the current state of the state machine. 
If the event is not
+        * consistent with the current state, the function produces an alert.
+        */
+       @SuppressWarnings("serial")
+       static class StateMachineMapper extends RichFlatMapFunction<Event, 
Alert> {
+
+               /** The state for the current key. */
+               private ValueState<State> currentState;
+
+               @Override
+               public void open(Configuration conf) {
+                       // get access to the state object
+                       currentState = getRuntimeContext().getState(
+                                               new 
ValueStateDescriptor<>("state", State.class));
+               }
+
+               @Override
+               public void flatMap(Event evt, Collector<Alert> out) throws 
Exception {
+                       // get the current state for the key (source address)
+                       // if no state exists, yet, the state must be the state 
machine's initial state
+                       State state = currentState.value();
+                       if (state == null) {
+                               state = State.Initial;
+                       }
+
+                       // ask the state machine what state we should go to 
based on teh given event
+                       State nextState = state.transition(evt.type());
+
+                       if (nextState == State.InvalidTransition) {
+                               // the current event resulted in an invalid 
transition
+                               // raise an alert!
+                               out.collect(new Alert(evt.sourceAddress(), 
state, evt.type()));
+                       }
+                       else if (nextState.isTerminal()) {
+                               // we reached a terminal state, clean up the 
current state
+                               currentState.clear();
+                       }
+                       else {
+                               // remember the new state
+                               currentState.update(nextState);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java
new file mode 100644
index 0000000..df958ae
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.dfa;
+
+import org.apache.flink.streaming.examples.statemachine.event.EventType;
+
+/**
+ * Simple combination of EventType and State.
+ */
+public class EventTypeAndState {
+
+       public final EventType eventType;
+
+       public final State state;
+
+       public EventTypeAndState(EventType eventType, State state) {
+               this.eventType = eventType;
+               this.state = state;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java
new file mode 100644
index 0000000..8c7b0e2
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.dfa;
+
+import org.apache.flink.streaming.examples.statemachine.event.EventType;
+
+import java.util.Random;
+
+/**
+ * The State captures the main functionality of the state machine. It 
represents
+ * a specific state in the state machine, and holds all transitions possible
+ * from a specific state.
+ *
+ * <p>The state transition diagram is as follows:
+ * <pre>
+ *           +--[a]--> W --[b]--> Y --[e]---+
+ *           |                    ^         |
+ *   Initial-+                    |         |
+ *           |                    |         +--> (Z)-----[g]---> Terminal
+ *           +--[c]--> X --[b]----+         |
+ *                     |                    |
+ *                     +--------[d]---------+
+ * </pre>
+ */
+public enum State {
+
+       /**
+        * The terminal state in the state machine.
+        */
+       Terminal,
+
+       /**
+        * Special state returned by the State.transition(...) function when 
attempting
+        * an illegal state transition.
+        */
+       InvalidTransition,
+
+       /**
+        * State 'Z'.
+        */
+       Z (new Transition(EventType.g, Terminal, 1.0f)),
+
+       /**
+        * State 'Y'.
+        */
+       Y (new Transition(EventType.e, Z, 1.0f)),
+
+       /**
+        * State 'X'.
+        */
+       X (new Transition(EventType.b, Y, 0.2f), new Transition(EventType.d, Z, 
0.8f)),
+
+       /**
+        * State 'W'.
+        */
+       W (new Transition(EventType.b, Y, 1.0f)),
+
+       /**
+        * The initial state from which all state sequences start.
+        */
+       Initial(new Transition(EventType.a, W, 0.6f), new 
Transition(EventType.c, X, 0.4f));
+
+       // 
------------------------------------------------------------------------
+
+       private final Transition[] transitions;
+
+       State(Transition... transitions) {
+               this.transitions = transitions;
+       }
+
+       /**
+        * Checks if this state is a terminal state.
+        * A terminal state has no outgoing transitions.
+        */
+       public boolean isTerminal() {
+               return transitions.length == 0;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the state after transitioning from this state based on the 
given event.
+        * If the transition is valid, this returns the new state, and if this 
transition
+        * is illegal, it returns [[InvalidTransition]].
+        *
+        * @param evt The event that defined the transition.
+        * @return The new state, or [[InvalidTransition]].
+        */
+       public State transition(EventType evt) {
+               for (Transition t : transitions) {
+                       if (t.eventType() == evt) {
+                               return t.targetState();
+                       }
+               }
+
+               // no transition found
+               return InvalidTransition;
+       }
+
+       /**
+        * Picks a random transition, based on the probabilities of the 
outgoing transitions
+        * of this state.
+        *
+        * @param rnd The random number generator to use.
+        * @return A pair of (transition event , new state).
+        */
+       public EventTypeAndState randomTransition(Random rnd) {
+               if (isTerminal()) {
+                       throw new RuntimeException("Cannot transition from 
state " + name());
+               }
+               else {
+                       final float p = rnd.nextFloat();
+                       float mass = 0.0f;
+                       Transition transition = null;
+
+                       for (Transition t : transitions) {
+                               mass += t.prob();
+                               if (p <= mass) {
+                                       transition = t;
+                                       break;
+                               }
+                       }
+
+                       assert transition != null;
+                       return new EventTypeAndState(transition.eventType(), 
transition.targetState());
+               }
+       }
+
+       /**
+        * Returns an event type that, if applied as a transition on this 
state, will result
+        * in an illegal state transition.
+        *
+        * @param rnd The random number generator to use.
+        * @return And event type for an illegal state transition.
+        */
+       public EventType randomInvalidTransition(Random rnd) {
+               while (true) {
+                       EventType candidate = 
EventType.values()[rnd.nextInt(EventType.values().length)];
+                       if (transition(candidate) == InvalidTransition) {
+                               return candidate;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java
new file mode 100644
index 0000000..470831d
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.dfa;
+
+import org.apache.flink.streaming.examples.statemachine.event.EventType;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A possible transition on a given event into a target state. The transition
+ * belongs to its originating state and has an associated probability that is
+ * used to generate random transition events.
+ */
+public class Transition implements Serializable {
+
+       // this class is serializable to be able to interact cleanly with enums.
+       private static final long serialVersionUID = 1L;
+
+       /** The event that triggers the transition. */
+       private final EventType eventType;
+
+       /** The target state after the transition. */
+       private final State targetState;
+
+       /** The probability of the transition. */
+       private final float prob;
+
+       /**
+        * Creates a new transition.
+        *
+        * @param eventType The event that triggers the transition.
+        * @param targetState The target state after the transition.
+        * @param prob The probability of the transition.
+        */
+       public Transition(EventType eventType, State targetState, float prob) {
+               this.eventType = checkNotNull(eventType);
+               this.targetState = checkNotNull(targetState);
+               this.prob = prob;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public EventType eventType() {
+               return eventType;
+       }
+
+       public State targetState() {
+               return targetState;
+       }
+
+       public float prob() {
+               return prob;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+               else if (obj == null || getClass() != obj.getClass()) {
+                       return false;
+               }
+               else {
+                       final Transition that = (Transition) obj;
+                       return this.eventType == that.eventType &&
+                                       this.targetState == that.targetState &&
+                                       Float.compare(this.prob, that.prob) == 
0;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               int code = 31 * eventType.hashCode() + targetState.hashCode();
+               return 31 * code + (prob != +0.0f ? Float.floatToIntBits(prob) 
: 0);
+       }
+
+       @Override
+       public String toString() {
+               return "--[" + eventType.name() + "]--> " + targetState.name() 
+ " (" + prob + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java
new file mode 100644
index 0000000..9b3a5f4
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.event;
+
+import org.apache.flink.streaming.examples.statemachine.dfa.State;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Data type for alerts.
+ */
+public class Alert {
+
+       private final int address;
+
+       private final State state;
+
+       private final EventType transition;
+
+       /**
+        * Creates a new alert.
+        *
+        * @param address The originating address (think 32 bit IPv4 address).
+        * @param state The state that the event state machine found.
+        * @param transition The transition that was considered invalid.
+        */
+       public Alert(int address, State state, EventType transition) {
+               this.address = address;
+               this.state = checkNotNull(state);
+               this.transition = checkNotNull(transition);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public int address() {
+               return address;
+       }
+
+       public State state() {
+               return state;
+       }
+
+       public EventType transition() {
+               return transition;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               int code = 31 * address + state.hashCode();
+               return 31 * code + transition.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+               else if (obj == null || getClass() != obj.getClass()) {
+                       return false;
+               }
+               else {
+                       final Alert that = (Alert) obj;
+                       return this.address == that.address &&
+                                       this.transition == that.transition &&
+                                       this.state == that.state;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "ALERT " + Event.formatAddress(address) + " : " + 
state.name() + " -> " + transition.name();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java
new file mode 100644
index 0000000..cfe4cfc
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.event;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Data type for events, consisting of the originating IP address and an event 
type.
+ */
+public class Event {
+
+       private final EventType type;
+
+       private final int sourceAddress;
+
+       /**
+        * Creates a new event.
+        *
+        * @param type The event type.
+        * @param sourceAddress The originating address (think 32 bit IPv4 
address).
+        */
+       public Event(EventType type, int sourceAddress) {
+               this.type = checkNotNull(type);
+               this.sourceAddress = sourceAddress;
+       }
+
+       /**
+        * Gets the event's type.
+        */
+       public EventType type() {
+               return type;
+       }
+
+       /**
+        * Gets the event's source address.
+        */
+       public int sourceAddress() {
+               return sourceAddress;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return 31 * type.hashCode() + sourceAddress;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+               else if (obj == null || getClass() != obj.getClass()) {
+                       return false;
+               }
+               else {
+                       final Event that = (Event) obj;
+                       return this.type == that.type && this.sourceAddress == 
that.sourceAddress;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "Event " + formatAddress(sourceAddress) + " : " + 
type.name();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utils
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Util method to create a string representation of a 32 bit integer 
representing
+        * an IPv4 address.
+        *
+        * @param address The address, MSB first.
+        * @return The IP address string.
+        */
+       public static String formatAddress(int address) {
+               int b1 = (address >>> 24) & 0xff;
+               int b2 = (address >>> 16) & 0xff;
+               int b3 = (address >>>  8) & 0xff;
+               int b4 =  address         & 0xff;
+
+               return "" + b1 + '.' + b2 + '.' + b3 + '.' + b4;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java
new file mode 100644
index 0000000..d7482ef
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.event;
+
+/**
+ * The type of the event processed by the state machine.
+ */
+public enum EventType {
+
+       a, b, c, d, e, f, g;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java
new file mode 100644
index 0000000..ed3c45b
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.generator;
+
+import org.apache.flink.streaming.examples.statemachine.dfa.EventTypeAndState;
+import org.apache.flink.streaming.examples.statemachine.dfa.State;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+import org.apache.flink.streaming.examples.statemachine.event.EventType;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generator for events. The generator internally maintains a series of state
+ * machines (addresses and current associated state) and returns transition 
events
+ * from those state machines. Each time the next event is generators, this
+ * generator picks a random state machine and creates a random transition on 
that
+ * state machine.
+ *
+ * <p>The generator randomly adds new state machines, and removes state 
machines as
+ * soon as they reach the terminal state. This implementation maintains up to
+ * 1000 state machines concurrently.
+ */
+public class EventsGenerator {
+
+       /** The random number generator. */
+       private final Random rnd;
+
+       /** The currently active state machines. */
+       private final LinkedHashMap<Integer, State> states;
+
+       /** Probability with this generator generates an illegal state 
transition. */
+       private final double errorProb;
+
+       public EventsGenerator() {
+               this(0.0);
+       }
+
+       public EventsGenerator(double errorProb) {
+               checkArgument(errorProb >= 0.0 && errorProb <= 1.0, "Invalid 
error probability");
+               this.errorProb = errorProb;
+
+               this.rnd = new Random();
+               this.states = new LinkedHashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new random event. This method randomly pick either
+        * one of its currently running state machines, or start a new state 
machine for
+        * a random IP address.
+        *
+        * <p>With {@link #errorProb} probability, the generated event will be 
from an illegal state
+        * transition of one of the currently running state machines.
+        *
+        * @param minIp The lower bound for the range from which a new IP 
address may be picked.
+        * @param maxIp The upper bound for the range from which a new IP 
address may be picked.
+        * @return A next random event.
+        */
+       public Event next(int minIp, int maxIp) {
+               final double p = rnd.nextDouble();
+
+               if (p * 1000 >= states.size()) {
+                       // create a new state machine
+                       final int nextIP = rnd.nextInt(maxIp - minIp) + minIp;
+
+                       if (!states.containsKey(nextIP)) {
+                               EventTypeAndState eventAndState = 
State.Initial.randomTransition(rnd);
+                               states.put(nextIP, eventAndState.state);
+                               return new Event(eventAndState.eventType, 
nextIP);
+                       }
+                       else {
+                               // collision on IP address, try again
+                               return next(minIp, maxIp);
+                       }
+               }
+               else {
+                       // pick an existing state machine
+
+                       // skip over some elements in the linked map, then take 
the next
+                       // update it, and insert it at the end
+
+                       int numToSkip = Math.min(20, 
rnd.nextInt(states.size()));
+                       Iterator<Entry<Integer, State>> iter = 
states.entrySet().iterator();
+
+                       for (int i = numToSkip; i > 0; --i) {
+                               iter.next();
+                       }
+
+                       Entry<Integer, State> entry = iter.next();
+                       State currentState = entry.getValue();
+                       int address = entry.getKey();
+
+                       iter.remove();
+
+                       if (p < errorProb) {
+                               EventType event = 
currentState.randomInvalidTransition(rnd);
+                               return new Event(event, address);
+                       }
+                       else {
+                               EventTypeAndState eventAndState = 
currentState.randomTransition(rnd);
+                               if (!eventAndState.state.isTerminal()) {
+                                       // reinsert
+                                       states.put(address, 
eventAndState.state);
+                               }
+
+                               return new Event(eventAndState.eventType, 
address);
+                       }
+               }
+       }
+
+       /**
+        * Creates an event for an illegal state transition of one of the 
internal
+        * state machines. If the generator has not yet started any state 
machines
+        * (for example, because no call to {@link #next(int, int)} was made, 
yet), this
+        * will return null.
+        *
+        * @return An event for a illegal state transition, or null, if not 
possible.
+        */
+       @Nullable
+       public Event nextInvalid() {
+               final Iterator<Entry<Integer, State>> iter = 
states.entrySet().iterator();
+               if (iter.hasNext()) {
+                       final Entry<Integer, State> entry = iter.next();
+
+                       State currentState = entry.getValue();
+                       int address = entry.getKey();
+                       iter.remove();
+
+                       EventType event = 
currentState.randomInvalidTransition(rnd);
+                       return new Event(event, address);
+               }
+               else {
+                       return null;
+               }
+       }
+
+       public int numActiveEntries() {
+               return states.size();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java
new file mode 100644
index 0000000..74fa9ed
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.generator;
+
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A event stream source that generates the events on the fly. Useful for
+ * self-contained demos.
+ */
+@SuppressWarnings("serial")
+public class EventsGeneratorSource extends RichParallelSourceFunction<Event> {
+
+       private final double errorProbability;
+
+       private final int delayPerRecordMillis;
+
+       private volatile boolean running = true;
+
+       public EventsGeneratorSource(double errorProbability, int 
delayPerRecordMillis) {
+               checkArgument(errorProbability >= 0.0 && errorProbability <= 
1.0, "error probability must be in [0.0, 1.0]");
+               checkArgument(delayPerRecordMillis >= 0, "deplay must be >= 0");
+
+               this.errorProbability = errorProbability;
+               this.delayPerRecordMillis = delayPerRecordMillis;
+       }
+
+       @Override
+       public void run(SourceContext<Event> sourceContext) throws Exception {
+               final EventsGenerator generator = new 
EventsGenerator(errorProbability);
+
+               final int range = Integer.MAX_VALUE / 
getRuntimeContext().getNumberOfParallelSubtasks();
+               final int min = range * 
getRuntimeContext().getIndexOfThisSubtask();
+               final int max = min + range;
+
+               while (running) {
+                       sourceContext.collect(generator.next(min, max));
+
+                       if (delayPerRecordMillis > 0) {
+                               Thread.sleep(delayPerRecordMillis);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
new file mode 100644
index 0000000..a09005d
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.generator;
+
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+
+/**
+ * Base for standalone generators that use the state machine to create event
+ * sequences and push them for example into Kafka.
+ */
+public class StandaloneThreadedGenerator {
+
+       public static void runGenerator(Collector<Event>[] collectors) throws 
IOException {
+
+               final GeneratorThread[] threads = new 
GeneratorThread[collectors.length];
+               final int range = Integer.MAX_VALUE / collectors.length;
+
+               // create the generator threads
+               for (int i = 0; i < threads.length; i++) {
+                       int min = range * i;
+                       int max = min + range;
+                       GeneratorThread thread = new 
GeneratorThread(collectors[i], min, max);
+                       threads[i] = thread;
+                       thread.setName("Generator " + i);
+               }
+
+               long delay = 2L;
+               int nextErroneous = 0;
+               boolean running = true;
+
+               for (GeneratorThread t : threads) {
+                       t.setDelay(delay);
+                       t.start();
+               }
+
+               final ThroughputLogger throughputLogger = new 
ThroughputLogger(threads);
+               throughputLogger.start();
+
+               System.out.println("Commands:");
+               System.out.println(" -> q : Quit");
+               System.out.println(" -> + : increase latency");
+               System.out.println(" -> - : decrease latency");
+               System.out.println(" -> e : inject invalid state transition");
+
+               // input loop
+
+               while (running) {
+                       final int next = System.in.read();
+
+                       switch (next) {
+                               case 'q':
+                                       System.out.println("Quitting...");
+                                       running = false;
+                                       break;
+
+                               case 'e':
+                                       System.out.println("Injecting erroneous 
transition ...");
+                                       
threads[nextErroneous].sendInvalidStateTransition();
+                                       nextErroneous = (nextErroneous + 1) % 
threads.length;
+                                       break;
+
+                               case '+':
+                                       delay = Math.max(delay * 2, 1);
+                                       System.out.println("Delay is " + delay);
+                                       for (GeneratorThread t : threads) {
+                                               t.setDelay(delay);
+                                       }
+                                       break;
+
+                               case '-':
+                                       delay /= 2;
+                                       System.out.println("Delay is " + delay);
+                                       for (GeneratorThread t : threads) {
+                                               t.setDelay(delay);
+                                       }
+                                       break;
+
+                               default:
+                                       // do nothing
+                       }
+               }
+
+               // shutdown
+               throughputLogger.shutdown();
+
+               for (GeneratorThread t : threads) {
+                       t.shutdown();
+
+                       try {
+                               t.join();
+                       } catch (InterruptedException e) {
+                               // restore interrupted status
+                               Thread.currentThread().interrupt();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A thread running a {@link EventsGenerator} and pushing generated 
events to the given collector
+        * (such as Kafka / Socket / ...).
+        */
+       private static class GeneratorThread extends Thread {
+
+               private final Collector<Event> out;
+
+               private final int minAddress;
+               private final int maxAddress;
+
+               private long delay;
+
+               private long count;
+
+               private volatile boolean running;
+
+               private volatile boolean injectInvalidNext;
+
+               /**
+                * Creates a new generator thread.
+                *
+                * @param out The collector to push the generated records to.
+                * @param minAddress The lower bound for the range from which a 
new IP address may be picked.
+                * @param maxAddress The upper bound for the range from which a 
new IP address may be picked.
+                */
+               GeneratorThread(Collector<Event> out, int minAddress, int 
maxAddress) {
+                       this.out = out;
+                       this.minAddress = minAddress;
+                       this.maxAddress = maxAddress;
+                       this.running = true;
+               }
+
+               @Override
+               public void run() {
+                       final EventsGenerator generator = new EventsGenerator();
+
+                       while (running) {
+                               if (injectInvalidNext) {
+                                       injectInvalidNext = false;
+                                       Event next = generator.nextInvalid();
+                                       if (next != null) {
+                                               out.collect(next);
+                                       }
+                               }
+                               else {
+                                       out.collect(generator.next(minAddress, 
maxAddress));
+                               }
+
+                               count += 1;
+
+                               // sleep the delay to throttle
+                               if (delay > 0) {
+                                       try {
+                                               Thread.sleep(delay);
+                                       } catch (InterruptedException e) {
+                                               
Thread.currentThread().interrupt();
+                                       }
+                               }
+                       }
+               }
+
+               public long currentCount() {
+                       return count;
+               }
+
+               public void shutdown() {
+                       running = false;
+                       interrupt();
+               }
+
+               public void setDelay(long delay) {
+                       this.delay = delay;
+               }
+
+               public void sendInvalidStateTransition() {
+                       injectInvalidNext = true;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Thread that periodically print the number of elements generated per 
second.
+        */
+       private static class ThroughputLogger extends Thread {
+
+               private final GeneratorThread[] generators;
+
+               private volatile boolean running;
+
+               /**
+                * Instantiates the throughput logger.
+                *
+                * @param generators The generator threads whose aggregate 
throughput should be logged.
+                */
+               ThroughputLogger(GeneratorThread[] generators) {
+                       this.generators = generators;
+                       this.running = true;
+               }
+
+               @Override
+               public void run() {
+                       long lastCount = 0L;
+                       long lastTimeStamp = System.currentTimeMillis();
+
+                       while (running) {
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                                       break;
+                               }
+
+                               long ts = System.currentTimeMillis();
+                               long currCount = 0L;
+                               for (GeneratorThread generator : generators) {
+                                       currCount += generator.currentCount();
+                               }
+
+                               double factor = (ts - lastTimeStamp) / 1000;
+                               double perSec = (currCount - lastCount) / 
factor;
+
+                               lastTimeStamp = ts;
+                               lastCount = currCount;
+
+                               System.out.println(perSec + " / sec");
+                       }
+               }
+
+               public void shutdown() {
+                       running = false;
+                       interrupt();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java
new file mode 100644
index 0000000..259a611
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+import org.apache.flink.streaming.examples.statemachine.event.EventType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A serializer and deserializer for the {@link Event} type.
+ */
+public class EventDeSerializer implements DeserializationSchema<Event>, 
SerializationSchema<Event> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public byte[] serialize(Event evt) {
+               ByteBuffer byteBuffer = 
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+               byteBuffer.putInt(0, evt.sourceAddress());
+               byteBuffer.putInt(4, evt.type().ordinal());
+               return byteBuffer.array();
+       }
+
+       @Override
+       public Event deserialize(byte[] message) throws IOException {
+               ByteBuffer buffer = 
ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
+               int address = buffer.getInt(0);
+               int typeOrdinal = buffer.getInt(4);
+               return new Event(EventType.values()[typeOrdinal], address);
+       }
+
+       @Override
+       public boolean isEndOfStream(Event nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<Event> getProducedType() {
+               return TypeInformation.of(Event.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java
new file mode 100644
index 0000000..a8cb14a
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.statemachine.kafka;
+
+import org.apache.flink.streaming.examples.statemachine.event.Event;
+import 
org.apache.flink.streaming.examples.statemachine.generator.StandaloneThreadedGenerator;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generator that pushes the data into Kafka.
+ */
+public class KafkaStandaloneGenerator extends StandaloneThreadedGenerator {
+
+       public static final String BROKER_ADDRESS = "localhost:9092";
+
+       public static final String TOPIC = "flink-demo-topic-1";
+
+       public static final int NUM_PARTITIONS = 1;
+
+       /**
+        * Entry point to the kafka data producer.
+        */
+       public static void main(String[] args) throws Exception {
+
+               final KafkaCollector[] collectors = new 
KafkaCollector[NUM_PARTITIONS];
+
+               // create the generator threads
+               for (int i = 0; i < collectors.length; i++) {
+                       collectors[i] = new KafkaCollector(BROKER_ADDRESS, 
TOPIC, i);
+               }
+
+               StandaloneThreadedGenerator.runGenerator(collectors);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class KafkaCollector implements Collector<Event>, 
AutoCloseable {
+
+               private final KafkaProducer<Object, byte[]> producer;
+
+               private final EventDeSerializer serializer;
+
+               private final String topic;
+
+               private final int partition;
+
+               KafkaCollector(String brokerAddress, String topic, int 
partition) {
+                       this.topic = checkNotNull(topic);
+                       this.partition = partition;
+                       this.serializer = new EventDeSerializer();
+
+                       // create Kafka producer
+                       Properties properties = new Properties();
+                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerAddress);
+                       
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+                       
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+                       this.producer = new KafkaProducer<>(properties);
+               }
+
+               @Override
+               public void collect(Event evt) {
+                       byte[] serialized = serializer.serialize(evt);
+                       producer.send(new ProducerRecord<>(topic, partition, 
null, serialized));
+               }
+
+               @Override
+               public void close() {
+                       producer.close();
+               }
+       }
+}
+

Reply via email to