[FLINK-8548] [examples] Add state machine example This adds an example of using a state machine for pattern validation. The example illustrates the use of state and the kafka connector.
This closes #5401 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80883fee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80883fee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80883fee Branch: refs/heads/master Commit: 80883feeeba5232ecd41fe6759c69bf2ff1a6483 Parents: 85bfc07 Author: Stephan Ewen <[email protected]> Authored: Thu Feb 1 19:51:59 2018 +0100 Committer: Stephan Ewen <[email protected]> Committed: Sun Feb 18 22:24:34 2018 +0100 ---------------------------------------------------------------------- flink-examples/flink-examples-streaming/pom.xml | 36 ++- .../streaming/examples/statemachine/README.md | 52 ++++ .../statemachine/StateMachineExample.java | 161 ++++++++++++ .../statemachine/dfa/EventTypeAndState.java | 36 +++ .../examples/statemachine/dfa/State.java | 160 ++++++++++++ .../examples/statemachine/dfa/Transition.java | 101 ++++++++ .../examples/statemachine/event/Alert.java | 91 +++++++ .../examples/statemachine/event/Event.java | 104 ++++++++ .../examples/statemachine/event/EventType.java | 27 ++ .../statemachine/generator/EventsGenerator.java | 164 ++++++++++++ .../generator/EventsGeneratorSource.java | 68 +++++ .../generator/StandaloneThreadedGenerator.java | 253 +++++++++++++++++++ .../statemachine/kafka/EventDeSerializer.java | 63 +++++ .../kafka/KafkaStandaloneGenerator.java | 97 +++++++ 14 files changed, 1412 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index e3d51a9..c9367b7 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -358,7 +358,6 @@ under the License. </includes> </configuration> </execution> - </executions> </plugin> @@ -487,11 +486,45 @@ under the License. </executions> </plugin> + <!-- Use the shade plugin to build a fat jar for the kafka example --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> + + <!-- State Machine Example --> + <execution> + <id>state-machine-example</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.examples.statemachine.StateMachineExample</mainClass> + </transformer> + </transformers> + <finalName>StateMachineExample</finalName> + <filters> + <filter> + <artifact>*</artifact> + <includes> + <include>org/apache/flink/streaming/examples/statemachine/**</include> + <include>org/apache/flink/streaming/util/serialization/**</include> + <include>org/apache/flink/streaming/connectors/kafka/**</include> + <include>org/apache/kafka/**</include> + <include>kafka/</include> + </includes> + </filter> + </filters> + </configuration> + </execution> + <execution> <id>fat-jar-kafka-010-example</id> <phase>package</phase> @@ -528,6 +561,7 @@ under the License. </filters> </configuration> </execution> + <execution> <id>fat-jar-twitter-example</id> <phase>package</phase> http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md new file mode 100644 index 0000000..6b334a3 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/README.md @@ -0,0 +1,52 @@ +Example: Running a state machine for pattern detection +====================================================== + +This example illustrates a minimal roll-your-own event pattern detection scenario, +using a simple state machine that is evaluated over the stream. + +While this example is much simpler and more manual than what the CEP library supports, +it illustrates the use of event processing and state management for a medium +complex scenario. + +**Scenario Description** + +Events in streams are expected to occur in certain patterns. Any deviation from +these patterns indicates an anomaly that the streaming system should recognize and that +should trigger an alert. + +You can, for example, think of events as being generated by network devices and services, +such as firewalls login-, and registration with an authentication service, etc. A deviation +from expected the expected pattern might indicate an intrusion detection. + +The event patterns are tracked per interacting party (here simplified per source IP address) +and are validated by a state machine. The state machine's states define what possible +events may occur next, and what new states these events will result in. + +The following diagram depicts the state machine used in this example. + +``` + +--<a>--> W --<b>--> Y --<e>---+ + | ^ | + INITIAL-+ | | + | | +--> (Z) -----<g>---> TERM + +--<c>--> X --<b>----+ | + | | + +--------<d>---------+ +``` + +**Example Program** + +The main class of this example program is `org.apache.flink.streaming.examples.statemachine.StateMachineExample`. +The core logic is in the `flatMap` function, which runs the state machines per IP address. + +The streaming data flow is as shown below, where the source stream may come from either +an embedded data generator, or from a from a Kafka topic: + +``` + [ stream partition 1] --> source --> partition -+---> flatMap(state machine) --> sink + \/ + /\ + [ stream partition 2] --> source --> partition -+---> flatMap(state machine) --> sink +``` + + http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java new file mode 100644 index 0000000..052e954 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.examples.statemachine.dfa.State; +import org.apache.flink.streaming.examples.statemachine.event.Alert; +import org.apache.flink.streaming.examples.statemachine.event.Event; +import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource; +import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer; +import org.apache.flink.util.Collector; + +import java.util.Properties; + +/** + * Main class of the state machine example. + * This class implements the streaming application that receives the stream of events and evaluates + * a state machine (per originating address) to validate that the events follow + * the state machine's rules. + */ +public class StateMachineExample { + + /** + * Main entry point for the program. + * + * @param args The command line arguments. + */ + public static void main(String[] args) throws Exception { + + // ---- print some usage help ---- + + System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]"); + System.out.println("Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]"); + System.out.println(); + + // ---- determine whether to use the built-in source, or read from Kafka ---- + + final SourceFunction<Event> source; + final ParameterTool params = ParameterTool.fromArgs(args); + + if (params.has("kafka-topic")) { + // set up the Kafka reader + String kafkaTopic = params.get("kafka-topic"); + String brokers = params.get("brokers", "localhost:9092"); + + System.out.printf("Reading from kafka topic %s @ %s\n", kafkaTopic, brokers); + System.out.println(); + + Properties kafkaProps = new Properties(); + kafkaProps.setProperty("bootstrap.servers", brokers); + + FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>(kafkaTopic, new EventDeSerializer(), kafkaProps); + kafka.setStartFromLatest(); + kafka.setCommitOffsetsOnCheckpoints(false); + source = kafka; + } + else { + double errorRate = params.getDouble("error-rate", 0.0); + int sleep = params.getInt("sleep", 1); + + System.out.printf("Using standalone source with error rate %f and sleep delay %s millis\n", errorRate, sleep); + System.out.println(); + + source = new EventsGeneratorSource(errorRate, sleep); + } + + // ---- main program ---- + + // create the environment to create streams and configure execution + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); + + DataStream<Event> events = env.addSource(source); + + DataStream<Alert> alerts = events + // partition on the address to make sure equal addresses + // end up in the same state machine flatMap function + .keyBy(Event::sourceAddress) + + // the function that evaluates the state machine over the sequence of events + .flatMap(new StateMachineMapper()); + + // output the alerts to std-out + alerts.print(); + + // trigger program execution + env.execute(); + } + + // ------------------------------------------------------------------------ + + /** + * The function that maintains the per-IP-address state machines and verifies that the + * events are consistent with the current state of the state machine. If the event is not + * consistent with the current state, the function produces an alert. + */ + @SuppressWarnings("serial") + static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> { + + /** The state for the current key. */ + private ValueState<State> currentState; + + @Override + public void open(Configuration conf) { + // get access to the state object + currentState = getRuntimeContext().getState( + new ValueStateDescriptor<>("state", State.class)); + } + + @Override + public void flatMap(Event evt, Collector<Alert> out) throws Exception { + // get the current state for the key (source address) + // if no state exists, yet, the state must be the state machine's initial state + State state = currentState.value(); + if (state == null) { + state = State.Initial; + } + + // ask the state machine what state we should go to based on teh given event + State nextState = state.transition(evt.type()); + + if (nextState == State.InvalidTransition) { + // the current event resulted in an invalid transition + // raise an alert! + out.collect(new Alert(evt.sourceAddress(), state, evt.type())); + } + else if (nextState.isTerminal()) { + // we reached a terminal state, clean up the current state + currentState.clear(); + } + else { + // remember the new state + currentState.update(nextState); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java new file mode 100644 index 0000000..df958ae --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/EventTypeAndState.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.dfa; + +import org.apache.flink.streaming.examples.statemachine.event.EventType; + +/** + * Simple combination of EventType and State. + */ +public class EventTypeAndState { + + public final EventType eventType; + + public final State state; + + public EventTypeAndState(EventType eventType, State state) { + this.eventType = eventType; + this.state = state; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java new file mode 100644 index 0000000..8c7b0e2 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/State.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.dfa; + +import org.apache.flink.streaming.examples.statemachine.event.EventType; + +import java.util.Random; + +/** + * The State captures the main functionality of the state machine. It represents + * a specific state in the state machine, and holds all transitions possible + * from a specific state. + * + * <p>The state transition diagram is as follows: + * <pre> + * +--[a]--> W --[b]--> Y --[e]---+ + * | ^ | + * Initial-+ | | + * | | +--> (Z)-----[g]---> Terminal + * +--[c]--> X --[b]----+ | + * | | + * +--------[d]---------+ + * </pre> + */ +public enum State { + + /** + * The terminal state in the state machine. + */ + Terminal, + + /** + * Special state returned by the State.transition(...) function when attempting + * an illegal state transition. + */ + InvalidTransition, + + /** + * State 'Z'. + */ + Z (new Transition(EventType.g, Terminal, 1.0f)), + + /** + * State 'Y'. + */ + Y (new Transition(EventType.e, Z, 1.0f)), + + /** + * State 'X'. + */ + X (new Transition(EventType.b, Y, 0.2f), new Transition(EventType.d, Z, 0.8f)), + + /** + * State 'W'. + */ + W (new Transition(EventType.b, Y, 1.0f)), + + /** + * The initial state from which all state sequences start. + */ + Initial(new Transition(EventType.a, W, 0.6f), new Transition(EventType.c, X, 0.4f)); + + // ------------------------------------------------------------------------ + + private final Transition[] transitions; + + State(Transition... transitions) { + this.transitions = transitions; + } + + /** + * Checks if this state is a terminal state. + * A terminal state has no outgoing transitions. + */ + public boolean isTerminal() { + return transitions.length == 0; + } + + // ------------------------------------------------------------------------ + + /** + * Gets the state after transitioning from this state based on the given event. + * If the transition is valid, this returns the new state, and if this transition + * is illegal, it returns [[InvalidTransition]]. + * + * @param evt The event that defined the transition. + * @return The new state, or [[InvalidTransition]]. + */ + public State transition(EventType evt) { + for (Transition t : transitions) { + if (t.eventType() == evt) { + return t.targetState(); + } + } + + // no transition found + return InvalidTransition; + } + + /** + * Picks a random transition, based on the probabilities of the outgoing transitions + * of this state. + * + * @param rnd The random number generator to use. + * @return A pair of (transition event , new state). + */ + public EventTypeAndState randomTransition(Random rnd) { + if (isTerminal()) { + throw new RuntimeException("Cannot transition from state " + name()); + } + else { + final float p = rnd.nextFloat(); + float mass = 0.0f; + Transition transition = null; + + for (Transition t : transitions) { + mass += t.prob(); + if (p <= mass) { + transition = t; + break; + } + } + + assert transition != null; + return new EventTypeAndState(transition.eventType(), transition.targetState()); + } + } + + /** + * Returns an event type that, if applied as a transition on this state, will result + * in an illegal state transition. + * + * @param rnd The random number generator to use. + * @return And event type for an illegal state transition. + */ + public EventType randomInvalidTransition(Random rnd) { + while (true) { + EventType candidate = EventType.values()[rnd.nextInt(EventType.values().length)]; + if (transition(candidate) == InvalidTransition) { + return candidate; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java new file mode 100644 index 0000000..470831d --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/dfa/Transition.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.dfa; + +import org.apache.flink.streaming.examples.statemachine.event.EventType; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A possible transition on a given event into a target state. The transition + * belongs to its originating state and has an associated probability that is + * used to generate random transition events. + */ +public class Transition implements Serializable { + + // this class is serializable to be able to interact cleanly with enums. + private static final long serialVersionUID = 1L; + + /** The event that triggers the transition. */ + private final EventType eventType; + + /** The target state after the transition. */ + private final State targetState; + + /** The probability of the transition. */ + private final float prob; + + /** + * Creates a new transition. + * + * @param eventType The event that triggers the transition. + * @param targetState The target state after the transition. + * @param prob The probability of the transition. + */ + public Transition(EventType eventType, State targetState, float prob) { + this.eventType = checkNotNull(eventType); + this.targetState = checkNotNull(targetState); + this.prob = prob; + } + + // ------------------------------------------------------------------------ + + public EventType eventType() { + return eventType; + } + + public State targetState() { + return targetState; + } + + public float prob() { + return prob; + } + + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + else if (obj == null || getClass() != obj.getClass()) { + return false; + } + else { + final Transition that = (Transition) obj; + return this.eventType == that.eventType && + this.targetState == that.targetState && + Float.compare(this.prob, that.prob) == 0; + } + } + + @Override + public int hashCode() { + int code = 31 * eventType.hashCode() + targetState.hashCode(); + return 31 * code + (prob != +0.0f ? Float.floatToIntBits(prob) : 0); + } + + @Override + public String toString() { + return "--[" + eventType.name() + "]--> " + targetState.name() + " (" + prob + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java new file mode 100644 index 0000000..9b3a5f4 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Alert.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.event; + +import org.apache.flink.streaming.examples.statemachine.dfa.State; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Data type for alerts. + */ +public class Alert { + + private final int address; + + private final State state; + + private final EventType transition; + + /** + * Creates a new alert. + * + * @param address The originating address (think 32 bit IPv4 address). + * @param state The state that the event state machine found. + * @param transition The transition that was considered invalid. + */ + public Alert(int address, State state, EventType transition) { + this.address = address; + this.state = checkNotNull(state); + this.transition = checkNotNull(transition); + } + + // ------------------------------------------------------------------------ + + public int address() { + return address; + } + + public State state() { + return state; + } + + public EventType transition() { + return transition; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + int code = 31 * address + state.hashCode(); + return 31 * code + transition.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + else if (obj == null || getClass() != obj.getClass()) { + return false; + } + else { + final Alert that = (Alert) obj; + return this.address == that.address && + this.transition == that.transition && + this.state == that.state; + } + } + + @Override + public String toString() { + return "ALERT " + Event.formatAddress(address) + " : " + state.name() + " -> " + transition.name(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java new file mode 100644 index 0000000..cfe4cfc --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/Event.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.event; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Data type for events, consisting of the originating IP address and an event type. + */ +public class Event { + + private final EventType type; + + private final int sourceAddress; + + /** + * Creates a new event. + * + * @param type The event type. + * @param sourceAddress The originating address (think 32 bit IPv4 address). + */ + public Event(EventType type, int sourceAddress) { + this.type = checkNotNull(type); + this.sourceAddress = sourceAddress; + } + + /** + * Gets the event's type. + */ + public EventType type() { + return type; + } + + /** + * Gets the event's source address. + */ + public int sourceAddress() { + return sourceAddress; + } + + // ------------------------------------------------------------------------ + // Miscellaneous + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return 31 * type.hashCode() + sourceAddress; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + else if (obj == null || getClass() != obj.getClass()) { + return false; + } + else { + final Event that = (Event) obj; + return this.type == that.type && this.sourceAddress == that.sourceAddress; + } + } + + @Override + public String toString() { + return "Event " + formatAddress(sourceAddress) + " : " + type.name(); + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + /** + * Util method to create a string representation of a 32 bit integer representing + * an IPv4 address. + * + * @param address The address, MSB first. + * @return The IP address string. + */ + public static String formatAddress(int address) { + int b1 = (address >>> 24) & 0xff; + int b2 = (address >>> 16) & 0xff; + int b3 = (address >>> 8) & 0xff; + int b4 = address & 0xff; + + return "" + b1 + '.' + b2 + '.' + b3 + '.' + b4; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java new file mode 100644 index 0000000..d7482ef --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/event/EventType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.event; + +/** + * The type of the event processed by the state machine. + */ +public enum EventType { + + a, b, c, d, e, f, g; +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java new file mode 100644 index 0000000..ed3c45b --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGenerator.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.generator; + +import org.apache.flink.streaming.examples.statemachine.dfa.EventTypeAndState; +import org.apache.flink.streaming.examples.statemachine.dfa.State; +import org.apache.flink.streaming.examples.statemachine.event.Event; +import org.apache.flink.streaming.examples.statemachine.event.EventType; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map.Entry; +import java.util.Random; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A generator for events. The generator internally maintains a series of state + * machines (addresses and current associated state) and returns transition events + * from those state machines. Each time the next event is generators, this + * generator picks a random state machine and creates a random transition on that + * state machine. + * + * <p>The generator randomly adds new state machines, and removes state machines as + * soon as they reach the terminal state. This implementation maintains up to + * 1000 state machines concurrently. + */ +public class EventsGenerator { + + /** The random number generator. */ + private final Random rnd; + + /** The currently active state machines. */ + private final LinkedHashMap<Integer, State> states; + + /** Probability with this generator generates an illegal state transition. */ + private final double errorProb; + + public EventsGenerator() { + this(0.0); + } + + public EventsGenerator(double errorProb) { + checkArgument(errorProb >= 0.0 && errorProb <= 1.0, "Invalid error probability"); + this.errorProb = errorProb; + + this.rnd = new Random(); + this.states = new LinkedHashMap<>(); + } + + // ------------------------------------------------------------------------ + + /** + * Creates a new random event. This method randomly pick either + * one of its currently running state machines, or start a new state machine for + * a random IP address. + * + * <p>With {@link #errorProb} probability, the generated event will be from an illegal state + * transition of one of the currently running state machines. + * + * @param minIp The lower bound for the range from which a new IP address may be picked. + * @param maxIp The upper bound for the range from which a new IP address may be picked. + * @return A next random event. + */ + public Event next(int minIp, int maxIp) { + final double p = rnd.nextDouble(); + + if (p * 1000 >= states.size()) { + // create a new state machine + final int nextIP = rnd.nextInt(maxIp - minIp) + minIp; + + if (!states.containsKey(nextIP)) { + EventTypeAndState eventAndState = State.Initial.randomTransition(rnd); + states.put(nextIP, eventAndState.state); + return new Event(eventAndState.eventType, nextIP); + } + else { + // collision on IP address, try again + return next(minIp, maxIp); + } + } + else { + // pick an existing state machine + + // skip over some elements in the linked map, then take the next + // update it, and insert it at the end + + int numToSkip = Math.min(20, rnd.nextInt(states.size())); + Iterator<Entry<Integer, State>> iter = states.entrySet().iterator(); + + for (int i = numToSkip; i > 0; --i) { + iter.next(); + } + + Entry<Integer, State> entry = iter.next(); + State currentState = entry.getValue(); + int address = entry.getKey(); + + iter.remove(); + + if (p < errorProb) { + EventType event = currentState.randomInvalidTransition(rnd); + return new Event(event, address); + } + else { + EventTypeAndState eventAndState = currentState.randomTransition(rnd); + if (!eventAndState.state.isTerminal()) { + // reinsert + states.put(address, eventAndState.state); + } + + return new Event(eventAndState.eventType, address); + } + } + } + + /** + * Creates an event for an illegal state transition of one of the internal + * state machines. If the generator has not yet started any state machines + * (for example, because no call to {@link #next(int, int)} was made, yet), this + * will return null. + * + * @return An event for a illegal state transition, or null, if not possible. + */ + @Nullable + public Event nextInvalid() { + final Iterator<Entry<Integer, State>> iter = states.entrySet().iterator(); + if (iter.hasNext()) { + final Entry<Integer, State> entry = iter.next(); + + State currentState = entry.getValue(); + int address = entry.getKey(); + iter.remove(); + + EventType event = currentState.randomInvalidTransition(rnd); + return new Event(event, address); + } + else { + return null; + } + } + + public int numActiveEntries() { + return states.size(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java new file mode 100644 index 0000000..74fa9ed --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorSource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.generator; + +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.examples.statemachine.event.Event; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A event stream source that generates the events on the fly. Useful for + * self-contained demos. + */ +@SuppressWarnings("serial") +public class EventsGeneratorSource extends RichParallelSourceFunction<Event> { + + private final double errorProbability; + + private final int delayPerRecordMillis; + + private volatile boolean running = true; + + public EventsGeneratorSource(double errorProbability, int delayPerRecordMillis) { + checkArgument(errorProbability >= 0.0 && errorProbability <= 1.0, "error probability must be in [0.0, 1.0]"); + checkArgument(delayPerRecordMillis >= 0, "deplay must be >= 0"); + + this.errorProbability = errorProbability; + this.delayPerRecordMillis = delayPerRecordMillis; + } + + @Override + public void run(SourceContext<Event> sourceContext) throws Exception { + final EventsGenerator generator = new EventsGenerator(errorProbability); + + final int range = Integer.MAX_VALUE / getRuntimeContext().getNumberOfParallelSubtasks(); + final int min = range * getRuntimeContext().getIndexOfThisSubtask(); + final int max = min + range; + + while (running) { + sourceContext.collect(generator.next(min, max)); + + if (delayPerRecordMillis > 0) { + Thread.sleep(delayPerRecordMillis); + } + } + } + + @Override + public void cancel() { + running = false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java new file mode 100644 index 0000000..a09005d --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.generator; + +import org.apache.flink.streaming.examples.statemachine.event.Event; +import org.apache.flink.util.Collector; + +import java.io.IOException; + +/** + * Base for standalone generators that use the state machine to create event + * sequences and push them for example into Kafka. + */ +public class StandaloneThreadedGenerator { + + public static void runGenerator(Collector<Event>[] collectors) throws IOException { + + final GeneratorThread[] threads = new GeneratorThread[collectors.length]; + final int range = Integer.MAX_VALUE / collectors.length; + + // create the generator threads + for (int i = 0; i < threads.length; i++) { + int min = range * i; + int max = min + range; + GeneratorThread thread = new GeneratorThread(collectors[i], min, max); + threads[i] = thread; + thread.setName("Generator " + i); + } + + long delay = 2L; + int nextErroneous = 0; + boolean running = true; + + for (GeneratorThread t : threads) { + t.setDelay(delay); + t.start(); + } + + final ThroughputLogger throughputLogger = new ThroughputLogger(threads); + throughputLogger.start(); + + System.out.println("Commands:"); + System.out.println(" -> q : Quit"); + System.out.println(" -> + : increase latency"); + System.out.println(" -> - : decrease latency"); + System.out.println(" -> e : inject invalid state transition"); + + // input loop + + while (running) { + final int next = System.in.read(); + + switch (next) { + case 'q': + System.out.println("Quitting..."); + running = false; + break; + + case 'e': + System.out.println("Injecting erroneous transition ..."); + threads[nextErroneous].sendInvalidStateTransition(); + nextErroneous = (nextErroneous + 1) % threads.length; + break; + + case '+': + delay = Math.max(delay * 2, 1); + System.out.println("Delay is " + delay); + for (GeneratorThread t : threads) { + t.setDelay(delay); + } + break; + + case '-': + delay /= 2; + System.out.println("Delay is " + delay); + for (GeneratorThread t : threads) { + t.setDelay(delay); + } + break; + + default: + // do nothing + } + } + + // shutdown + throughputLogger.shutdown(); + + for (GeneratorThread t : threads) { + t.shutdown(); + + try { + t.join(); + } catch (InterruptedException e) { + // restore interrupted status + Thread.currentThread().interrupt(); + } + } + } + + // ------------------------------------------------------------------------ + + /** + * A thread running a {@link EventsGenerator} and pushing generated events to the given collector + * (such as Kafka / Socket / ...). + */ + private static class GeneratorThread extends Thread { + + private final Collector<Event> out; + + private final int minAddress; + private final int maxAddress; + + private long delay; + + private long count; + + private volatile boolean running; + + private volatile boolean injectInvalidNext; + + /** + * Creates a new generator thread. + * + * @param out The collector to push the generated records to. + * @param minAddress The lower bound for the range from which a new IP address may be picked. + * @param maxAddress The upper bound for the range from which a new IP address may be picked. + */ + GeneratorThread(Collector<Event> out, int minAddress, int maxAddress) { + this.out = out; + this.minAddress = minAddress; + this.maxAddress = maxAddress; + this.running = true; + } + + @Override + public void run() { + final EventsGenerator generator = new EventsGenerator(); + + while (running) { + if (injectInvalidNext) { + injectInvalidNext = false; + Event next = generator.nextInvalid(); + if (next != null) { + out.collect(next); + } + } + else { + out.collect(generator.next(minAddress, maxAddress)); + } + + count += 1; + + // sleep the delay to throttle + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + public long currentCount() { + return count; + } + + public void shutdown() { + running = false; + interrupt(); + } + + public void setDelay(long delay) { + this.delay = delay; + } + + public void sendInvalidStateTransition() { + injectInvalidNext = true; + } + } + + // ------------------------------------------------------------------------ + + /** + * Thread that periodically print the number of elements generated per second. + */ + private static class ThroughputLogger extends Thread { + + private final GeneratorThread[] generators; + + private volatile boolean running; + + /** + * Instantiates the throughput logger. + * + * @param generators The generator threads whose aggregate throughput should be logged. + */ + ThroughputLogger(GeneratorThread[] generators) { + this.generators = generators; + this.running = true; + } + + @Override + public void run() { + long lastCount = 0L; + long lastTimeStamp = System.currentTimeMillis(); + + while (running) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + + long ts = System.currentTimeMillis(); + long currCount = 0L; + for (GeneratorThread generator : generators) { + currCount += generator.currentCount(); + } + + double factor = (ts - lastTimeStamp) / 1000; + double perSec = (currCount - lastCount) / factor; + + lastTimeStamp = ts; + lastCount = currCount; + + System.out.println(perSec + " / sec"); + } + } + + public void shutdown() { + running = false; + interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java new file mode 100644 index 0000000..259a611 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/EventDeSerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.examples.statemachine.event.Event; +import org.apache.flink.streaming.examples.statemachine.event.EventType; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * A serializer and deserializer for the {@link Event} type. + */ +public class EventDeSerializer implements DeserializationSchema<Event>, SerializationSchema<Event> { + + private static final long serialVersionUID = 1L; + + @Override + public byte[] serialize(Event evt) { + ByteBuffer byteBuffer = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); + byteBuffer.putInt(0, evt.sourceAddress()); + byteBuffer.putInt(4, evt.type().ordinal()); + return byteBuffer.array(); + } + + @Override + public Event deserialize(byte[] message) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN); + int address = buffer.getInt(0); + int typeOrdinal = buffer.getInt(4); + return new Event(EventType.values()[typeOrdinal], address); + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation<Event> getProducedType() { + return TypeInformation.of(Event.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/80883fee/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java new file mode 100644 index 0000000..a8cb14a --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.statemachine.kafka; + +import org.apache.flink.streaming.examples.statemachine.event.Event; +import org.apache.flink.streaming.examples.statemachine.generator.StandaloneThreadedGenerator; +import org.apache.flink.util.Collector; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A generator that pushes the data into Kafka. + */ +public class KafkaStandaloneGenerator extends StandaloneThreadedGenerator { + + public static final String BROKER_ADDRESS = "localhost:9092"; + + public static final String TOPIC = "flink-demo-topic-1"; + + public static final int NUM_PARTITIONS = 1; + + /** + * Entry point to the kafka data producer. + */ + public static void main(String[] args) throws Exception { + + final KafkaCollector[] collectors = new KafkaCollector[NUM_PARTITIONS]; + + // create the generator threads + for (int i = 0; i < collectors.length; i++) { + collectors[i] = new KafkaCollector(BROKER_ADDRESS, TOPIC, i); + } + + StandaloneThreadedGenerator.runGenerator(collectors); + } + + // ------------------------------------------------------------------------ + + private static class KafkaCollector implements Collector<Event>, AutoCloseable { + + private final KafkaProducer<Object, byte[]> producer; + + private final EventDeSerializer serializer; + + private final String topic; + + private final int partition; + + KafkaCollector(String brokerAddress, String topic, int partition) { + this.topic = checkNotNull(topic); + this.partition = partition; + this.serializer = new EventDeSerializer(); + + // create Kafka producer + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + this.producer = new KafkaProducer<>(properties); + } + + @Override + public void collect(Event evt) { + byte[] serialized = serializer.serialize(evt); + producer.send(new ProducerRecord<>(topic, partition, null, serialized)); + } + + @Override + public void close() { + producer.close(); + } + } +} +
