sjwiesman commented on a change in pull request #9210: [FLINK-12746][docs] Getting Started - DataStream Example Walkthrough URL: https://github.com/apache/flink/pull/9210#discussion_r315946232
########## File path: docs/getting-started/walkthroughs/datastream_api.md ########## @@ -0,0 +1,923 @@ +--- +title: "DataStream API" +nav-id: datastreamwalkthrough +nav-title: 'DataStream API' +nav-parent_id: walkthroughs +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a DataStream API for building robust, stateful streaming applications. +It provides fine-grained control over state and time, which allows for the implementation of complex event-driven systems. + +* This will be replaced by the TOC +{:toc} + +## What Are You Building? + +Credit card fraud is a growing concern in the digital age. +Criminals steal credit card numbers by running scams or hacking into insecure systems. +Stolen numbers are tested by making one or more small purchases, often for a dollar or less. +If that works, they then make more significant purchases to get items they can sell or keep for themselves. + +In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions. +Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time. + +## Prerequisites + +This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## How To Follow Along + +If you want to follow along, you will require a computer with: + +* Java 8 +* Maven + +A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly: + +{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %} + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +</div> + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project into your editor, you will see a file with the following code which you can run directly inside your IDE. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +#### FraudDetectionJob.java + +{% highlight java %} +package frauddetection; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; +import org.apache.flink.walkthrough.common.source.TransactionSource; + +public class FraudDetectionJob { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); + + DataStream<Alerts> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); + + alerts + .addSink(new AlertSink()) + .name("send-alerts"); + + env.execute("Fraud Detection"); + } +} +{% endhighlight %} + +#### FraudDetector.java +{% highlight java %} +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + public static final double SMALL_AMOUNT = 1.00; + + public static final double LARGE_AMOUNT = 500.00; + + public static final long ONE_DAY = 24 * 60 * 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +#### FraudDetectionJob.scala + +{% highlight scala %} +package frauddetection + +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.scala.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction +import org.apache.flink.walkthrough.common.source.TransactionSource + +object FraudDetectionJob { + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val transactions = env + .addSource(new TransactionSource) + .name("transactions") + + val alerts = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") + + alerts + .addSink(new AlertSink) + .name("send-alerts") + + env.execute("Fraud Detection") + } +} +{% endhighlight %} + +#### FraudDetector.scala + +{% highlight scala %} +object FraudDetector { + + val SMALL_AMOUNT = 1.00 + + val LARGE_AMOUNT = 500.00 + + val ONE_DAY = 24 * 60 * 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + override def processElement( + transaction: Transaction, + context: Context, + collector: Collector[Alert]): Unit = { + + Alert alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} +{% endhighlight %} +</div> +</div> + +## Breaking Down The Code + +#### The Execution Environment + +The first line sets up your `StreamExecutionEnvironment`. +The execution environment is how you can set properties for your Job and create your sources. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +{% endhighlight %} +</div> +</div> + +#### Creating A Source + +Sources define connections to external systems that Flink can use to consume data from, for example Apache Kafka, Rabbit MQ, or Apache Pulsar. +This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process. +Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`). +The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions") +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val transactions = env + .addSource(new TransactionSource) + .name("transactions") +{% endhighlight %} +</div> +</div> + + +#### Partitioning Events & Detecting Fraud + +The stream contains transactions from a large number of users; however, fraud occurs on a per-account basis. To detect fraud, you must ensure that the same instance of the fraud detector processes every event for a given account. + +Streams can be partitioned using `DataStream#keyBy` to ensure that the same physical operator processes all records for a particular key. +It is common to say the operator immediatly after a `keyBy` is executed within a _keyed context_. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Alerts> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val alerts = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") +{% endhighlight %} +</div> +</div> + +#### Outputting Results + +Sink's connect Flink Jobs to external systems to send events to; such as Apache Kafka, Casandra, and AWS Kinesis. Review comment: Your right. `'s` is possessive, 's' is plural. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services