This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit df8f9a586143bbd719b6e9f03592e02e45629a9a Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Mon Jul 22 16:01:44 2019 -0500 [FLINK-12746][docs] Add DataStream API Walkthrough This closes #9201. --- docs/fig/fraud-transactions.svg | 71 ++ .../getting-started/walkthroughs/datastream_api.md | 925 +++++++++++++++++++++ .../walkthroughs/datastream_api.zh.md | 925 +++++++++++++++++++++ docs/getting-started/walkthroughs/table_api.md | 2 +- docs/getting-started/walkthroughs/table_api.zh.md | 2 +- flink-end-to-end-tests/run-nightly-tests.sh | 2 + flink-end-to-end-tests/test-scripts/common.sh | 12 + flink-end-to-end-tests/test-scripts/test_cli.sh | 11 - ...throughs.sh => test_datastream_walkthroughs.sh} | 35 +- .../test-scripts/test_table_walkthroughs.sh | 1 + .../flink/walkthrough/common/entity/Alert.java | 61 ++ .../flink/walkthrough/common/sink/AlertSink.java | 43 + .../flink-walkthrough-datastream-java/pom.xml | 37 + .../META-INF/maven/archetype-metadata.xml | 36 + .../src/main/resources/archetype-resources/pom.xml | 225 +++++ .../src/main/java/FraudDetectionJob.java | 50 ++ .../src/main/java/FraudDetector.java | 48 ++ .../src/main/resources/log4j.properties | 24 + .../flink-walkthrough-datastream-scala/pom.xml | 37 + .../META-INF/maven/archetype-metadata.xml | 36 + .../src/main/resources/archetype-resources/pom.xml | 256 ++++++ .../src/main/resources/log4j.properties | 24 + .../src/main/scala/FraudDetectionJob.scala | 51 ++ .../src/main/scala/FraudDetector.scala | 49 ++ flink-walkthroughs/pom.xml | 2 + 25 files changed, 2945 insertions(+), 20 deletions(-) diff --git a/docs/fig/fraud-transactions.svg b/docs/fig/fraud-transactions.svg new file mode 100644 index 0000000..f8e59d9 --- /dev/null +++ b/docs/fig/fraud-transactions.svg @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<svg version="1.1" viewBox="0 0 842.6483154296875 203.27821350097656" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="100%" height="100%"> + <rect id="svgEditorBackground" x="0" y="0" width="842.648" height="203.278" style="fill:none;stroke:none;" /> + <clipPath id="p.0"> + <path d="m0 0l842.6483 0l0 203.27821l-842.6483 0l0 -203.27821z" clip-rule="nonzero" /> + </clipPath> + <g clip-path="url(#p.0)"> + <path fill="#000000" fill-opacity="0.0" d="M0,0l842.6483,0l0,203.27821l-842.6483,0Z" fill-rule="evenodd" /> + <path fill="#000000" fill-opacity="0.0" d="m203.24672 147.81758l0 -46.015747l138.67717 0l0 46.015747z" fill-rule="evenodd" /> + <path fill="#000000" d="M265.14047,136.5832q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21876499999999,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.3593673700000011,-5.265625,-1.3124923999999965q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.93751499999999, [...] + <path fill="#000000" d="M608.6628,138.55434q-0.078125,-3,-1.625,-4.890625q-1.546875,-1.90625,-4.234375,-2.53125q-2.703125,-0.625,-9.21875,-0.65625q-6.53125,-0.046875,-8.59375,-0.234375q-3.28125,-0.359375,-5.265625,-1.3125q-2,-0.953125,-3.1875,-2.359375q-1.203125,-1.40625,-1.828125,-3.59375q-0.390625,-1.484375,-0.390625,-4.84375l0,-2.1875l6.140625,0l0,1.203125q0,4.0625,1.46875,5.390625q1.453125,1.328125,6.53125,1.328125q10.234375,0,12.9375,0.4375q4.171875,0.703125,6.4375,2.40625q2 [...] + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.850395 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m119.850395 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m195.85039 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m271.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m347.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m423.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m499.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m575.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m651.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m727.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m803.8504 19.053806l0 76.19423" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 19.552494l760.9974 0" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 54.749344l760.9974 0" fill-rule="nonzero" /> + <path stroke="#9e9e9e" stroke-width="1.0" stroke-linecap="butt" d="m43.351707 94.749344l760.9974 0" fill-rule="nonzero" /> + <path fill="#000000" d="m68.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...] + <path fill="#000000" d="m144.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m220.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m296.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m372.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m448.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m524.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m600.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m676.63684 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 [...] + <path fill="#000000" d="m748.9306 41.950916l0 -8.421875l-3.140625 0l0 -1.125l7.5625 0l0 1.125l-3.15625 0l0 8.421875l-1.265625 0zm4.7819824 0l2.53125 -3.59375l-2.34375 -3.3125l1.46875 0l1.0625 1.609375q0.296875 0.46875 0.484375 0.78125q0.28125 -0.4375 0.515625 -0.765625l1.171875 -1.625l1.40625 0l-2.390625 3.25l2.5625 3.65625l-1.4375 0l-1.421875 -2.140625l-0.375 -0.59375l-1.8125 2.734375l-1.421875 0zm7.4453125 0l0 -6.90625l1.0625 0l0 0.984375q0.75 -1.140625 2.1875 -1.140625q0.625 0 [...] + <path fill="#000000" d="m64.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...] + <path fill="#000000" d="m140.78021 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...] + <path fill="#000000" d="m220.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...] + <path fill="#000000" d="m289.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...] + <path fill="#000000" d="m365.07397 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...] + <path fill="#000000" d="m444.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...] + <path fill="#000000" d="m524.48645 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.6093 [...] + <path fill="#000000" d="m596.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...] + <path fill="#000000" d="m669.074 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.609375 [...] + <path fill="#000000" d="m748.7802 77.92434l0 -1.171875q-0.875 -0.109375 -1.421875 -0.390625q-0.546875 -0.28125 -0.953125 -0.90625q-0.390625 -0.640625 -0.46875 -1.5625l1.1875 -0.21875q0.140625 0.953125 0.484375 1.390625q0.484375 0.625 1.171875 0.703125l0 -3.734375q-0.71875 -0.140625 -1.46875 -0.5625q-0.5625 -0.3125 -0.875 -0.859375q-0.296875 -0.5625 -0.296875 -1.265625q0 -1.25 0.890625 -2.015625q0.59375 -0.53125 1.75 -0.640625l0 -0.5625l0.703125 0l0 0.5625q1.015625 0.09375 1.60937 [...] + <path fill="#000000" fill-opacity="0.0" d="m236.08136 158.07217l73.00787 0l0 68.28346l-73.00787 0z" fill-rule="evenodd" /> + <path fill="#000000" d="m246.61261 184.99217l0 -13.359375l9.015625 0l0 1.578125l-7.25 0l0 4.140625l6.265625 0l0 1.578125l-6.265625 0l0 6.0625l-1.765625 0zm11.083496 0l0 -9.671875l1.46875 0l0 1.46875q0.5625 -1.03125 1.03125 -1.359375q0.484375 -0.328125 1.0625 -0.328125q0.828125 0 1.6875 0.53125l-0.5625 1.515625q-0.609375 -0.359375 -1.203125 -0.359375q-0.546875 0 -0.96875 0.328125q-0.421875 0.328125 -0.609375 0.890625q-0.28125 0.875 -0.28125 1.921875l0 5.0625l-1.625 0zm12.540802 -1 [...] + <path fill="#000000" fill-opacity="0.0" d="m563.58795 160.0433l105.03937 0l0 68.28346l-105.03937 0z" fill-rule="evenodd" /> + <path fill="#000000" d="m574.0098 186.9633l0 -13.359375l1.8125 0l7.015625 10.484375l0 -10.484375l1.6875 0l0 13.359375l-1.8125 0l-7.015625 -10.5l0 10.5l-1.6875 0zm12.676025 -4.84375q0 -2.6875 1.484375 -3.96875q1.25 -1.078125 3.046875 -1.078125q2.0 0 3.265625 1.3125q1.265625 1.296875 1.265625 3.609375q0 1.859375 -0.5625 2.9375q-0.5625 1.0625 -1.640625 1.65625q-1.0625 0.59375 -2.328125 0.59375q-2.03125 0 -3.28125 -1.296875q-1.25 -1.3125 -1.25 -3.765625zm1.6875 0q0 1.859375 0.796875 [...] + </g> +</svg> diff --git a/docs/getting-started/walkthroughs/datastream_api.md b/docs/getting-started/walkthroughs/datastream_api.md new file mode 100644 index 0000000..8676ae4 --- /dev/null +++ b/docs/getting-started/walkthroughs/datastream_api.md @@ -0,0 +1,925 @@ +--- +title: "DataStream API" +nav-id: datastreamwalkthrough +nav-title: 'DataStream API' +nav-parent_id: walkthroughs +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a DataStream API for building robust, stateful streaming applications. +It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems. +In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API. + +* This will be replaced by the TOC +{:toc} + +## What Are You Building? + +Credit card fraud is a growing concern in the digital age. +Criminals steal credit card numbers by running scams or hacking into insecure systems. +Stolen numbers are tested by making one or more small purchases, often for a dollar or less. +If that works, they then make more significant purchases to get items they can sell or keep for themselves. + +In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions. +Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time. + +## Prerequisites + +This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## How to Follow Along + +If you want to follow along, you will require a computer with: + +* Java 8 +* Maven + +A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic. +These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough. + +{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %} + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +</div> + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial. +After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE. +Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +#### FraudDetectionJob.java + +{% highlight java %} +package spendreport; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.walkthrough.common.sink.AlertSink; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; +import org.apache.flink.walkthrough.common.source.TransactionSource; + +public class FraudDetectionJob { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); + + DataStream<Alert> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); + + alerts + .addSink(new AlertSink()) + .name("send-alerts"); + + env.execute("Fraud Detection"); + } +} +{% endhighlight %} + +#### FraudDetector.java +{% highlight java %} +package spendreport; + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; + +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +#### FraudDetectionJob.scala + +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.walkthrough.common.sink.AlertSink +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction +import org.apache.flink.walkthrough.common.source.TransactionSource + +object FraudDetectionJob { + + @throws[Exception] + def main(args: Array[String]): Unit = { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val transactions: DataStream[Transaction] = env + .addSource(new TransactionSource) + .name("transactions") + + val alerts: DataStream[Alert] = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") + + alerts + .addSink(new AlertSink) + .name("send-alerts") + + env.execute("Fraud Detection") + } +} +{% endhighlight %} + +#### FraudDetector.scala + +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction + +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} + +{% endhighlight %} +</div> +</div> + +## Breaking Down the Code + +Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions. + +We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class. + +#### The Execution Environment + +The first line sets up your `StreamExecutionEnvironment`. +The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +{% endhighlight %} +</div> +</div> + +#### Creating a Source + +Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs. +This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process. +Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`). +The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val transactions: DataStream[Transaction] = env + .addSource(new TransactionSource) + .name("transactions") +{% endhighlight %} +</div> +</div> + + +#### Partitioning Events & Detecting Fraud + +The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator. + +To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. +The `process()` call adds an operator that applies a function to each partitioned element in the stream. +It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_. + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Alert> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val alerts: DataStream[Alert] = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") +{% endhighlight %} +</div> +</div> + +#### Outputting Results + +A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis. +The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +alerts.addSink(new AlertSink()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +alerts.addSink(new AlertSink) +{% endhighlight %} +</div> +</div> + +#### Executing the Job + +Flink applications are built lazily and shipped to the cluster for execution only once fully formed. +Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +env.execute("Fraud Detection"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +env.execute("Fraud Detection") +{% endhighlight %} +</div> +</div> + +#### The Fraud Detector + +The fraud detector is implemented as a `KeyedProcessFunction`. +Its method `KeyedProcessFunction#processElement` is called for every transaction event. +This first version produces an alert on every transaction, which some may say is overly conservative. + +The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} +{% endhighlight %} +</div> +</div> + +## Writing a Real Application (v1) + +For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500. +Imagine your fraud detector processes the following stream of transactions for a particular account. + +<p class="text-center"> + <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/> +</p> + +Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510. +Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern. + +To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small. +Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). +It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough. + +The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed. +When a large transaction comes through, you can simply check if the flag is set for that account. + +However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. +Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. +We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure. +Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure. + +To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables. + +The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps. +`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`. +A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed. +In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. +`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data. +The right hook for this is the `open()` method. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private transient ValueState<Boolean> flagState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + } +{% endhighlight %} +</div> +</div> + +`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library. +It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents. +If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`. +Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`. +Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable. + +Below, you can see an example of how you can use a flag state to track potential fraudulent transactions. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + // Get the current state for the current key + Boolean lastTransactionWasSmall = flagState.value(); + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount() > LARGE_AMOUNT) { + // Output an alert downstream + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } + + // Clean up our state + flagState.clear(); + } + + if (transaction.getAmount() < SMALL_AMOUNT) { + // Set the flag to true + flagState.update(true); + } + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} + override def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + // Get the current state for the current key + val lastTransactionWasSmall = flagState.value + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { + // Output an alert downstream + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } + // Clean up our state + flagState.clear() + } + + if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + } + } +{% endhighlight %} +</div> +</div> + +For every transaction, the fraud detector checks the state of the flag for that account. +Remember, `ValueState` is always scoped to the current key, i.e., account. +If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert. + +After that check, the flag state is unconditionally cleared. +Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted. + +Finally, the transaction amount is checked to see if it is small. +If so, then the flag is set so that it can be checked by the next event. +Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable. +This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not. + +## Fraud Detector v2: State + Time = ❤️ + +Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. +For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other. +Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future. + +Let's see how we can modify our Job to comply with our new requirements: + +* Whenever the flag is set to `true`, also set a timer for 1 minute in the future. +* When the timer fires, reset the flag by clearing its state. +* If the flag is ever cleared the timer should be canceled. + +To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + private transient ValueState<Boolean> flagState; + private transient ValueState<Long> timerState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + + ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( + "timer-state", + Types.LONG); + timerState = getRuntimeContext().getState(timerDescriptor); + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + @transient private var timerState: ValueState[java.lang.Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + + val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) + timerState = getRuntimeContext.getState(timerDescriptor) + } +{% endhighlight %} +</div> +</div> + +`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service. +The timer service can be used to query the current time, register timers, and delete timers. +With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +if (transaction.getAmount() < SMALL_AMOUNT) { + // set the flag to true + flagState.update(true); + + // set the timer and timer state + long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; + context.timerService().registerProcessingTimeTimer(timer); + timerState.update(timer); +} +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + + // set the timer and timer state + val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE + context.timerService.registerProcessingTimeTimer(timer) + timerState.update(timer) +} +{% endhighlight %} +</div> +</div> + +Processing time is wall clock time, and is determined by the system clock of the machine running the operator. + +When a timer fires, it calls `KeyedProcessFunction#onTimer`. +Overriding this method is how you can implement your callback to reset the flag. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +@Override +public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { + // remove flag after 1 minute + timerState.clear(); + flagState.clear(); +} +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, + out: Collector[Alert]): Unit = { + // remove flag after 1 minute + timerState.clear() + flagState.clear() +} +{% endhighlight %} +</div> +</div> + +Finally, to cancel the timer, you need to delete the registered timer and delete the timer state. +You can wrap this in a helper method and call this method instead of `flagState.clear()`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +private void cleanUp(Context ctx) throws Exception { + // delete timer + Long timer = timerState.value(); + ctx.timerService().deleteProcessingTimeTimer(timer); + + // clean up all state + timerState.clear(); + flagState.clear(); +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@throws[Exception] +private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { + // delete timer + val timer = timerState.value + ctx.timerService.deleteProcessingTimeTimer(timer) + + // clean up all states + timerState.clear() + flagState.clear() +} +{% endhighlight %} +</div> +</div> + +And that's it, a fully functional, stateful, distributed streaming application! + +## Final Application + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +package spendreport; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; + +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + private transient ValueState<Boolean> flagState; + private transient ValueState<Long> timerState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + + ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( + "timer-state", + Types.LONG); + timerState = getRuntimeContext().getState(timerDescriptor); + } + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + // Get the current state for the current key + Boolean lastTransactionWasSmall = flagState.value(); + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount() > LARGE_AMOUNT) { + //Output an alert downstream + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } + // Clean up our state + cleanUp(context); + } + + if (transaction.getAmount() < SMALL_AMOUNT) { + // set the flag to true + flagState.update(true); + + long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; + context.timerService().registerProcessingTimeTimer(timer); + + timerState.update(timer); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { + // remove flag after 1 minute + timerState.clear(); + flagState.clear(); + } + + private void cleanUp(Context ctx) throws Exception { + // delete timer + Long timer = timerState.value(); + ctx.timerService().deleteProcessingTimeTimer(timer); + + // clean up all state + timerState.clear(); + flagState.clear(); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +package spendreport + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.scala.typeutils.Types +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction + +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + @transient private var timerState: ValueState[java.lang.Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + + val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) + timerState = getRuntimeContext.getState(timerDescriptor) + } + + override def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + // Get the current state for the current key + val lastTransactionWasSmall = flagState.value + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { + // Output an alert downstream + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } + // Clean up our state + cleanUp(context) + } + + if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE + + context.timerService.registerProcessingTimeTimer(timer) + timerState.update(timer) + } + } + + override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, + out: Collector[Alert]): Unit = { + // remove flag after 1 minute + timerState.clear() + flagState.clear() + } + + @throws[Exception] + private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { + // delete timer + val timer = timerState.value + ctx.timerService.deleteProcessingTimeTimer(timer) + + // clean up all states + timerState.clear() + flagState.clear() + } +} +{% endhighlight %} +</div> +</div> + +### Expected Output + +Running this code with the provided `TransactionSource` will emit fraud alerts for account 3. +You should see the following output in your task manager logs: + +{% highlight bash %} +2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +{% endhighlight %} diff --git a/docs/getting-started/walkthroughs/datastream_api.zh.md b/docs/getting-started/walkthroughs/datastream_api.zh.md new file mode 100644 index 0000000..8676ae4 --- /dev/null +++ b/docs/getting-started/walkthroughs/datastream_api.zh.md @@ -0,0 +1,925 @@ +--- +title: "DataStream API" +nav-id: datastreamwalkthrough +nav-title: 'DataStream API' +nav-parent_id: walkthroughs +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink offers a DataStream API for building robust, stateful streaming applications. +It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems. +In this step-by-step guide you'll learn how to build a stateful streaming application with Flink's DataStream API. + +* This will be replaced by the TOC +{:toc} + +## What Are You Building? + +Credit card fraud is a growing concern in the digital age. +Criminals steal credit card numbers by running scams or hacking into insecure systems. +Stolen numbers are tested by making one or more small purchases, often for a dollar or less. +If that works, they then make more significant purchases to get items they can sell or keep for themselves. + +In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions. +Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time. + +## Prerequisites + +This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language. + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/gettinghelp.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## How to Follow Along + +If you want to follow along, you will require a computer with: + +* Java 8 +* Maven + +A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic. +These dependencies include `flink-streaming-java` which is the core dependency for all Flink streaming applications and `flink-walkthrough-common` that has data generators and other classes specific to this walkthrough. + +{% panel **Note:** Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available [at the bottom of the page](#final-application). %} + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-java \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-walkthrough-datastream-scala \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} + -DarchetypeVersion={{ site.version }} \ + -DgroupId=frauddetection \ + -DartifactId=frauddetection \ + -Dversion=0.1 \ + -Dpackage=spendreport \ + -DinteractiveMode=false +{% endhighlight %} +</div> +</div> + +{% unless site.is_stable %} +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> + <b>Note</b>: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to <a href="http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html">Maven official document</a> +</p> +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial. +After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE. +Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +#### FraudDetectionJob.java + +{% highlight java %} +package spendreport; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.walkthrough.common.sink.AlertSink; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; +import org.apache.flink.walkthrough.common.source.TransactionSource; + +public class FraudDetectionJob { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); + + DataStream<Alert> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); + + alerts + .addSink(new AlertSink()) + .name("send-alerts"); + + env.execute("Fraud Detection"); + } +} +{% endhighlight %} + +#### FraudDetector.java +{% highlight java %} +package spendreport; + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; + +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +#### FraudDetectionJob.scala + +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.walkthrough.common.sink.AlertSink +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction +import org.apache.flink.walkthrough.common.source.TransactionSource + +object FraudDetectionJob { + + @throws[Exception] + def main(args: Array[String]): Unit = { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val transactions: DataStream[Transaction] = env + .addSource(new TransactionSource) + .name("transactions") + + val alerts: DataStream[Alert] = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") + + alerts + .addSink(new AlertSink) + .name("send-alerts") + + env.execute("Fraud Detection") + } +} +{% endhighlight %} + +#### FraudDetector.scala + +{% highlight scala %} +package spendreport + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction + +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} + +{% endhighlight %} +</div> +</div> + +## Breaking Down the Code + +Let's walk step-by-step through the code of these two files. The `FraudDetectionJob` class defines the data flow of the application and the `FraudDetector` class defines the business logic of the function that detects fraudulent transactions. + +We start describing how the Job is assembled in the `main` method of the `FraudDetectionJob` class. + +#### The Execution Environment + +The first line sets up your `StreamExecutionEnvironment`. +The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +{% endhighlight %} +</div> +</div> + +#### Creating a Source + +Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs. +This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process. +Each transaction contains an account ID (`accountId`), timestamp (`timestamp`) of when the transaction occurred, and US$ amount (`amount`). +The `name` attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val transactions: DataStream[Transaction] = env + .addSource(new TransactionSource) + .name("transactions") +{% endhighlight %} +</div> +</div> + + +#### Partitioning Events & Detecting Fraud + +The `transactions` stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator. + +To ensure that the same physical task processes all records for a particular key, you can partition a stream using `DataStream#keyBy`. +The `process()` call adds an operator that applies a function to each partitioned element in the stream. +It is common to say the operator immediately after a `keyBy`, in this case `FraudDetector`, is executed within a _keyed context_. + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Alert> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val alerts: DataStream[Alert] = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") +{% endhighlight %} +</div> +</div> + +#### Outputting Results + +A sink writes a `DataStream` to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis. +The `AlertSink` logs each `Alert` record with log level **INFO**, instead of writing it to persistent storage, so you can easily see your results. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +alerts.addSink(new AlertSink()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +alerts.addSink(new AlertSink) +{% endhighlight %} +</div> +</div> + +#### Executing the Job + +Flink applications are built lazily and shipped to the cluster for execution only once fully formed. +Call `StreamExecutionEnvironment#execute` to begin the execution of our Job and give it a name. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +env.execute("Fraud Detection"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +env.execute("Fraud Detection") +{% endhighlight %} +</div> +</div> + +#### The Fraud Detector + +The fraud detector is implemented as a `KeyedProcessFunction`. +Its method `KeyedProcessFunction#processElement` is called for every transaction event. +This first version produces an alert on every transaction, which some may say is overly conservative. + +The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} +{% endhighlight %} +</div> +</div> + +## Writing a Real Application (v1) + +For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500. +Imagine your fraud detector processes the following stream of transactions for a particular account. + +<p class="text-center"> + <img alt="Transactions" width="80%" src="{{ site.baseurl }}/fig/fraud-transactions.svg"/> +</p> + +Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510. +Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern. + +To do this, the fraud detector must _remember_ information across events; a large transaction is only fraudulent if the previous one was small. +Remembering information across events requires [state]({{ site.baseurl }}/concepts/glossary.html#managed-state), and that is why we decided to use a [KeyedProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html). +It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough. + +The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed. +When a large transaction comes through, you can simply check if the flag is set for that account. + +However, merely implementing the flag as a member variable in the `FraudDetector` class will not work. +Flink processes the transactions of multiple accounts with the same object instance of `FraudDetector`, which means if accounts A and B are routed through the same instance of `FraudDetector`, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert. +We could of course use a data structure like a `Map` to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure. +Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure. + +To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables. + +The most basic type of state in Flink is [ValueState]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state), a data type that adds fault tolerance to any variable it wraps. +`ValueState` is a form of _keyed state_, meaning it is only available in operators that are applied in a _keyed context_; any operator immediately following `DataStream#keyBy`. +A _keyed state_ of an operator is automatically scoped to the key of the record that is currently processed. +In this example, the key is the account id for the current transaction (as declared by `keyBy()`), and `FraudDetector` maintains an independent state for each account. +`ValueState` is created using a `ValueStateDescriptor` which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data. +The right hook for this is the `open()` method. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private transient ValueState<Boolean> flagState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + } +{% endhighlight %} +</div> +</div> + +`ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library. +It provides three methods for interacting with its contents; `update` sets the state, `value` gets the current value, and `clear` deletes its contents. +If the state for a particular key is empty, such as at the beginning of an application or after calling `ValueState#clear`, then `ValueState#value` will return `null`. +Modifications to the object returned by `ValueState#value` are not guaranteed to be recognized by the system, and so all changes must be performed with `ValueState#update`. +Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable. + +Below, you can see an example of how you can use a flag state to track potential fraudulent transactions. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + // Get the current state for the current key + Boolean lastTransactionWasSmall = flagState.value(); + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount() > LARGE_AMOUNT) { + // Output an alert downstream + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } + + // Clean up our state + flagState.clear(); + } + + if (transaction.getAmount() < SMALL_AMOUNT) { + // Set the flag to true + flagState.update(true); + } + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} + override def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + // Get the current state for the current key + val lastTransactionWasSmall = flagState.value + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { + // Output an alert downstream + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } + // Clean up our state + flagState.clear() + } + + if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + } + } +{% endhighlight %} +</div> +</div> + +For every transaction, the fraud detector checks the state of the flag for that account. +Remember, `ValueState` is always scoped to the current key, i.e., account. +If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert. + +After that check, the flag state is unconditionally cleared. +Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted. + +Finally, the transaction amount is checked to see if it is small. +If so, then the flag is set so that it can be checked by the next event. +Notice that `ValueState<Boolean>` actually has three states, unset ( `null`), `true`, and `false`, because all `ValueState`'s are nullable. +This job only makes use of unset ( `null`) and `true` to check whether the flag is set or not. + +## Fraud Detector v2: State + Time = ❤️ + +Scammers don't wait long to make their large purchase to reduce the chances their test transaction is noticed. +For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other. +Flink's `KeyedProcessFunction` allows you to set timers which invoke a callback method at some point in time in the future. + +Let's see how we can modify our Job to comply with our new requirements: + +* Whenever the flag is set to `true`, also set a timer for 1 minute in the future. +* When the timer fires, reset the flag by clearing its state. +* If the flag is ever cleared the timer should be canceled. + +To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + private transient ValueState<Boolean> flagState; + private transient ValueState<Long> timerState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + + ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( + "timer-state", + Types.LONG); + timerState = getRuntimeContext().getState(timerDescriptor); + } +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + @transient private var timerState: ValueState[java.lang.Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + + val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) + timerState = getRuntimeContext.getState(timerDescriptor) + } +{% endhighlight %} +</div> +</div> + +`KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service. +The timer service can be used to query the current time, register timers, and delete timers. +With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in `timerState`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +if (transaction.getAmount() < SMALL_AMOUNT) { + // set the flag to true + flagState.update(true); + + // set the timer and timer state + long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; + context.timerService().registerProcessingTimeTimer(timer); + timerState.update(timer); +} +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + + // set the timer and timer state + val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE + context.timerService.registerProcessingTimeTimer(timer) + timerState.update(timer) +} +{% endhighlight %} +</div> +</div> + +Processing time is wall clock time, and is determined by the system clock of the machine running the operator. + +When a timer fires, it calls `KeyedProcessFunction#onTimer`. +Overriding this method is how you can implement your callback to reset the flag. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +@Override +public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { + // remove flag after 1 minute + timerState.clear(); + flagState.clear(); +} +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, + out: Collector[Alert]): Unit = { + // remove flag after 1 minute + timerState.clear() + flagState.clear() +} +{% endhighlight %} +</div> +</div> + +Finally, to cancel the timer, you need to delete the registered timer and delete the timer state. +You can wrap this in a helper method and call this method instead of `flagState.clear()`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +private void cleanUp(Context ctx) throws Exception { + // delete timer + Long timer = timerState.value(); + ctx.timerService().deleteProcessingTimeTimer(timer); + + // clean up all state + timerState.clear(); + flagState.clear(); +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +@throws[Exception] +private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { + // delete timer + val timer = timerState.value + ctx.timerService.deleteProcessingTimeTimer(timer) + + // clean up all states + timerState.clear() + flagState.clear() +} +{% endhighlight %} +</div> +</div> + +And that's it, a fully functional, stateful, distributed streaming application! + +## Final Application + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +package spendreport; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; + +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + private transient ValueState<Boolean> flagState; + private transient ValueState<Long> timerState; + + @Override + public void open(Configuration parameters) { + ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( + "flag", + Types.BOOLEAN); + flagState = getRuntimeContext().getState(flagDescriptor); + + ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>( + "timer-state", + Types.LONG); + timerState = getRuntimeContext().getState(timerDescriptor); + } + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + // Get the current state for the current key + Boolean lastTransactionWasSmall = flagState.value(); + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount() > LARGE_AMOUNT) { + //Output an alert downstream + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } + // Clean up our state + cleanUp(context); + } + + if (transaction.getAmount() < SMALL_AMOUNT) { + // set the flag to true + flagState.update(true); + + long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; + context.timerService().registerProcessingTimeTimer(timer); + + timerState.update(timer); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) { + // remove flag after 1 minute + timerState.clear(); + flagState.clear(); + } + + private void cleanUp(Context ctx) throws Exception { + // delete timer + Long timer = timerState.value(); + ctx.timerService().deleteProcessingTimeTimer(timer); + + // clean up all state + timerState.clear(); + flagState.clear(); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +package spendreport + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.scala.typeutils.Types +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction + +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @transient private var flagState: ValueState[java.lang.Boolean] = _ + @transient private var timerState: ValueState[java.lang.Long] = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN) + flagState = getRuntimeContext.getState(flagDescriptor) + + val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG) + timerState = getRuntimeContext.getState(timerDescriptor) + } + + override def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + // Get the current state for the current key + val lastTransactionWasSmall = flagState.value + + // Check if the flag is set + if (lastTransactionWasSmall != null) { + if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) { + // Output an alert downstream + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } + // Clean up our state + cleanUp(context) + } + + if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) { + // set the flag to true + flagState.update(true) + val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE + + context.timerService.registerProcessingTimeTimer(timer) + timerState.update(timer) + } + } + + override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, + out: Collector[Alert]): Unit = { + // remove flag after 1 minute + timerState.clear() + flagState.clear() + } + + @throws[Exception] + private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = { + // delete timer + val timer = timerState.value + ctx.timerService.deleteProcessingTimeTimer(timer) + + // clean up all states + timerState.clear() + flagState.clear() + } +} +{% endhighlight %} +</div> +</div> + +### Expected Output + +Running this code with the provided `TransactionSource` will emit fraud alerts for account 3. +You should see the following output in your task manager logs: + +{% highlight bash %} +2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3} +{% endhighlight %} diff --git a/docs/getting-started/walkthroughs/table_api.md b/docs/getting-started/walkthroughs/table_api.md index 878fb54..f0cbb62 100644 --- a/docs/getting-started/walkthroughs/table_api.md +++ b/docs/getting-started/walkthroughs/table_api.md @@ -3,7 +3,7 @@ title: "Table API" nav-id: tableapiwalkthrough nav-title: 'Table API' nav-parent_id: walkthroughs -nav-pos: 1 +nav-pos: 2 --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/docs/getting-started/walkthroughs/table_api.zh.md b/docs/getting-started/walkthroughs/table_api.zh.md index 878fb54..f0cbb62 100644 --- a/docs/getting-started/walkthroughs/table_api.zh.md +++ b/docs/getting-started/walkthroughs/table_api.zh.md @@ -3,7 +3,7 @@ title: "Table API" nav-id: tableapiwalkthrough nav-title: 'Table API' nav-parent_id: walkthroughs -nav-pos: 1 +nav-pos: 2 --- <!-- Licensed to the Apache Software Foundation (ASF) under one diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 3b34b560..9307188 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -140,6 +140,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Walkthrough Table Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh java" run_test "Walkthrough Table Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_walkthroughs.sh scala" +run_test "Walkthrough DataStream Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java" +run_test "Walkthrough DataStream Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala" run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_gcp_pubsub.sh" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 2dc3787..bdecb32 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -745,3 +745,15 @@ function retry_times() { return 1 } +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" + +function extract_job_id_from_job_submission_return() { + if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; + then + JOB_ID="${BASH_REMATCH[1]}"; + else + JOB_ID="" + fi + echo "$JOB_ID" +} + diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh index b9d285b..4b04890 100755 --- a/flink-end-to-end-tests/test-scripts/test_cli.sh +++ b/flink-end-to-end-tests/test-scripts/test_cli.sh @@ -29,23 +29,12 @@ $FLINK_DIR/bin/taskmanager.sh start $FLINK_DIR/bin/taskmanager.sh start # CLI regular expressions -JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\"" JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\"" JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :" EXIT_CODE=0 -function extract_job_id_from_job_submission_return() { - if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; - then - JOB_ID="${BASH_REMATCH[1]}"; - else - JOB_ID="" - fi - echo "$JOB_ID" -} - function extract_valid_pact_from_job_info_return() { PACT_MATCH=0 if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]]; diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh similarity index 75% copy from flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh copy to flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh index 77afc58..e976927 100755 --- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh +++ b/flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh @@ -19,7 +19,7 @@ # End to end test for quick starts test. # Usage: -# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh <Type (java or scala)> +# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_datastream_walkthroughs.sh <Type (java or scala)> source "$(dirname "$0")"/common.sh @@ -28,12 +28,12 @@ TEST_TYPE=$1 mkdir -p "${TEST_DATA_DIR}" cd "${TEST_DATA_DIR}" -ARTIFACT_ID=flink-walkthrough-table-${TEST_TYPE} +ARTIFACT_ID=flink-walkthrough-datastream-${TEST_TYPE} ARTIFACT_VERSION=0.1 mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ - -DarchetypeArtifactId=flink-walkthrough-table-${TEST_TYPE} \ + -DarchetypeArtifactId=flink-walkthrough-datastream-${TEST_TYPE} \ -DarchetypeVersion=${FLINK_VERSION} \ -DgroupId=org.apache.flink.walkthrough \ -DartifactId=${ARTIFACT_ID} \ @@ -46,7 +46,8 @@ cd "${ARTIFACT_ID}" mvn clean package -nsu > compile-output.txt if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then - echo "Failure: The walk-through did not successfully compile" + echo "Failure: The walkthrough did not successfully compile" + cat compile-output.txt exit 1 fi @@ -67,8 +68,28 @@ fi TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar -add_optional_lib "table" - start_cluster -${FLINK_DIR}/bin/flink run "$TEST_PROGRAM_JAR" +JOB_ID="" +EXIT_CODE=0 + +RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR` +echo "$RETURN" +JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` +EXIT_CODE=$? # expect matching job id extraction + +if [ $EXIT_CODE == 0 ]; then + RETURN=`$FLINK_DIR/bin/flink list -r` + echo "$RETURN" + if [[ `grep -c "$JOB_ID" "$RETURN"` -eq '1' ]]; then # expect match for running job + echo "[FAIL] Unable to submit walkthrough" + EXIT_CODE=1 + fi +fi + +if [ $EXIT_CODE == 0 ]; then + eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}" + EXIT_CODE=$? +fi + +exit $EXIT_CODE diff --git a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh index 77afc58..f53e972 100755 --- a/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh +++ b/flink-end-to-end-tests/test-scripts/test_table_walkthroughs.sh @@ -47,6 +47,7 @@ mvn clean package -nsu > compile-output.txt if [[ `grep -c "BUILD FAILURE" compile-output.txt` -eq '1' ]]; then echo "Failure: The walk-through did not successfully compile" + cat compile-output.txt exit 1 fi diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java new file mode 100644 index 0000000..9678fdb --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/entity/Alert.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.entity; + +import java.util.Objects; + +/** + * A simple alert event. + */ +@SuppressWarnings("unused") +public final class Alert { + + private long id; + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o == null || getClass() != o.getClass()) { + return false; + } + Alert alert = (Alert) o; + return id == alert.id; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public String toString() { + return "Alert{" + + "id=" + id + + '}'; + } +} diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java new file mode 100644 index 0000000..4332a8e --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/sink/AlertSink.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.walkthrough.common.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.walkthrough.common.entity.Alert; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A sink for outputting alerts. + */ +@PublicEvolving +@SuppressWarnings("unused") +public class AlertSink implements SinkFunction<Alert> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class); + + @Override + public void invoke(Alert value, Context context) { + LOG.info(value.toString()); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml new file mode 100644 index 0000000..8788a0f --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/pom.xml @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthroughs</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthrough-datastream-java</artifactId> + <packaging>maven-archetype</packaging> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml new file mode 100644 index 0000000..ed235f5 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -0,0 +1,36 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<archetype-descriptor + xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" + name="flink-walkthrough-datastream-java"> + <fileSets> + <fileSet filtered="true" packaged="true" encoding="UTF-8"> + <directory>src/main/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </fileSet> + <fileSet encoding="UTF-8"> + <directory>src/main/resources</directory> + </fileSet> + </fileSets> +</archetype-descriptor> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 0000000..8229d78 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,225 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>${groupId}</groupId> + <artifactId>${artifactId}</artifactId> + <version>${version}</version> + <packaging>jar</packaging> + + <name>Flink Walkthrough DataStream Java</name> + <url>https://flink.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>@project.version@</flink.version> + <java.version>1.8</java.version> + <scala.binary.version>2.11</scala.binary.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> + </properties> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- This dependency is provided, because it should not be packaged into the JAR file. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Add connector dependencies here. They must be in the default scope (compile). --> + + <!-- Example: + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + --> + + <!-- Add logging framework, to produce console output when running in the IDE. --> + <!-- These dependencies are excluded from the application JAR by default. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + + <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> + <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>${package}.FraudDetectionJob</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + + <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <versionRange>[3.0.0,)</versionRange> + <goals> + <goal>shade</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <versionRange>[3.1,)</versionRange> + <goals> + <goal>testCompile</goal> + <goal>compile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <!-- This profile helps to make things run out of the box in IntelliJ --> + <!-- Its adds Flink's core classes to the runtime class path. --> + <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> + <profiles> + <profile> + <id>add-dependencies-for-IDEA</id> + + <activation> + <property> + <name>idea.version</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java new file mode 100644 index 0000000..46019fd --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetectionJob.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.walkthrough.common.sink.AlertSink; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; +import org.apache.flink.walkthrough.common.source.TransactionSource; + +/** + * Skeleton code for the datastream walkthrough + */ +public class FraudDetectionJob { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Transaction> transactions = env + .addSource(new TransactionSource()) + .name("transactions"); + + DataStream<Alert> alerts = transactions + .keyBy(Transaction::getAccountId) + .process(new FraudDetector()) + .name("fraud-detector"); + + alerts + .addSink(new AlertSink()) + .name("send-alerts"); + + env.execute("Fraud Detection"); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java new file mode 100644 index 0000000..e5c034c --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/java/FraudDetector.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.walkthrough.common.entity.Alert; +import org.apache.flink.walkthrough.common.entity.Transaction; + +/** + * Skeleton code for implementing a fraud detector. + */ +public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { + + private static final long serialVersionUID = 1L; + + private static final double SMALL_AMOUNT = 1.00; + private static final double LARGE_AMOUNT = 500.00; + private static final long ONE_MINUTE = 60 * 1000; + + @Override + public void processElement( + Transaction transaction, + Context context, + Collector<Alert> collector) throws Exception { + + Alert alert = new Alert(); + alert.setId(transaction.getAccountId()); + + collector.collect(alert); + } +} diff --git a/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties new file mode 100644 index 0000000..8be9b9a --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-java/src/main/resources/archetype-resources/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=Warn, console +log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml new file mode 100644 index 0000000..3e35e27 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthroughs</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-walkthrough-datastream-scala</artifactId> + <packaging>maven-archetype</packaging> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml new file mode 100644 index 0000000..e78dc8d --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -0,0 +1,36 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<archetype-descriptor + xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" + name="flink-walkthrough-datastream-scala"> + <fileSets> + <fileSet filtered="true" packaged="true" encoding="UTF-8"> + <directory>src/main/scala</directory> + <includes> + <include>**/*.scala</include> + </includes> + </fileSet> + <fileSet encoding="UTF-8"> + <directory>src/main/resources</directory> + </fileSet> + </fileSets> +</archetype-descriptor> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 0000000..93956fa --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,256 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>${groupId}</groupId> + <artifactId>${artifactId}</artifactId> + <version>${version}</version> + <packaging>jar</packaging> + + <name>Flink Walkthrough DataStram Scala</name> + <url>https://flink.apache.org</url> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>@project.version@</flink.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.12</scala.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- Apache Flink dependencies --> + <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Add connector dependencies here. They must be in the default scope (compile). --> + + <!-- Example: + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + --> + + <!-- Add logging framework, to produce console output when running in the IDE. --> + <!-- These dependencies are excluded from the application JAR by default. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> + <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>${package}.FraudDetectionJob</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Eclipse Scala Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <!-- This profile helps to make things run out of the box in IntelliJ --> + <!-- Its adds Flink's core classes to the runtime class path. --> + <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> + <profiles> + <profile> + <id>add-dependencies-for-IDEA</id> + + <activation> + <property> + <name>idea.version</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + +</project> diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties new file mode 100644 index 0000000..8be9b9a --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=Warn, console +log4j.logger.org.apache.flink.walkthrough.common.sink.AlertSink=info + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala new file mode 100644 index 0000000..58e46e2 --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package} + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.walkthrough.common.sink.AlertSink +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction +import org.apache.flink.walkthrough.common.source.TransactionSource + +/** + * Skeleton code for the DataStream code walkthrough + */ +object FraudDetectionJob { + + @throws[Exception] + def main(args: Array[String]): Unit = { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val transactions: DataStream[Transaction] = env + .addSource(new TransactionSource) + .name("transactions") + + val alerts: DataStream[Alert] = transactions + .keyBy(transaction => transaction.getAccountId) + .process(new FraudDetector) + .name("fraud-detector") + + alerts + .addSink(new AlertSink) + .name("send-alerts") + + env.execute("Fraud Detection") + } +} diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala new file mode 100644 index 0000000..6d7d91d --- /dev/null +++ b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package} + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.walkthrough.common.entity.Alert +import org.apache.flink.walkthrough.common.entity.Transaction + +/** + * Skeleton code for implementing a fraud detector. + */ +object FraudDetector { + val SMALL_AMOUNT: Double = 1.00 + val LARGE_AMOUNT: Double = 500.00 + val ONE_MINUTE: Long = 60 * 1000L +} + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert]): Unit = { + + val alert = new Alert + alert.setId(transaction.getAccountId) + + collector.collect(alert) + } +} diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml index 2733f59..ce499e7 100644 --- a/flink-walkthroughs/pom.xml +++ b/flink-walkthroughs/pom.xml @@ -36,6 +36,8 @@ under the License. <module>flink-walkthrough-common</module> <module>flink-walkthrough-table-java</module> <module>flink-walkthrough-table-scala</module> + <module>flink-walkthrough-datastream-java</module> + <module>flink-walkthrough-datastream-scala</module> </modules> <build> <extensions>