infoverload commented on a change in pull request #18055: URL: https://github.com/apache/flink/pull/18055#discussion_r774011630
########## File path: docs/content/docs/try-flink/write_flink_program_with_sql.md ########## @@ -0,0 +1,269 @@ +--- +title: 'Write your first Flink program with SQL' +weight: 2 +type: docs +aliases: + - /try-flink/write_flink_program_with_sql.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Write your first Flink program with SQL + +## Introduction + +Flink features [multiple APIs]({{< ref "docs/concepts/overview" >}}) with different levels of abstraction that can be used to develop your streaming application. SQL is the highest level of abstraction and is supported by Flink as a relational API for batch and stream processing. This means that you can write the same queries on both unbounded real-time streams and bounded recorded streams and produce the same results. + +SQL on Flink is based on [Apache Calcite](https://calcite.apache.org/) (which is based on standard SQL) and is commonly used to ease the process of implementing data analytics, data pipelining, and ETL applications. It is a great entryway to writing your first Flink application and requires no Java or Python. + +This tutorial will guide you through writing your first Flink program leveraging SQL alone. Through this exercise you will learn and understand the ease and speed with which you can analyze streaming data in Flink! + + +## Goals + +This tutorial will teach you how to: + +- use the Flink SQL client to submit queries +- consume a data source with Flink SQL +- run a continuous query on a stream of data +- use Flink SQL to write out results to persistent storage + + +## Prerequisites + +You only need to have basic knowledge of SQL to follow along. + + +## Step 1: Start the Flink SQL client + +The [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}) is bundled in the regular Flink distribution and can be run out-of-the-box. It requires only a running Flink cluster where table programs can be executed (since Flink SQL is a thin abstraction over the Table API). + +There are many ways to set up Flink but you will run it locally for the purpose of this tutorial. [Download Flink]({{< ref "docs/try-flink/local_installation#downloading-flink" >}}) and [start a local cluster]({{< ref "docs/try-flink/local_installation#starting-and-stopping-a-local-cluster" >}}) with one worker (the TaskManager). + +The scripts for the SQL client are located in the `/bin` directory of Flink. You can start the client by executing: + +```sh +./bin/sql-client.sh +``` + +You should see something like this: + +{{< img src="/fig/try-flink/flink-sql.png" alt="Flink SQL client" >}} + + +## Step 2: Set up a data source with flink-faker + +Like with any Flink program, you will need a data source to connect to so that Flink can process it. There are many popular data sources but for the interest of this tutorial, you will be using [flink-faker](https://github.com/knaufk/flink-faker). This custom [table source]({{< ref "docs/connectors/table/overview" >}}) is based on [Java Faker](https://github.com/DiUS/java-faker) and can generate fake data continuously in memory and in a realistic format. + +Java Faker is a tool for generating this data and flink-faker exposes that as a source in Flink by implementing the [DynamicTableSource interface]({{< javadoc file="org/apache/flink/table/connector/source/DynamicTableSource.html" name="DynamicTableSource">}}). The dynamic table source has the logic of how to create a table source (in this case, from flink-faker), and then by adding a [factory]({{< javadoc file="org/apache/flink/table/factories/DynamicTableSourceFactory.html" name="DynamicTableSourceFactory">}}) for it you can expose it in the SQL API by referencing it with `"connector" = "faker"`. + + + +The next step is to make `flink-faker` discoverable by the Flink SQL client by following these [instructions](https://github.com/knaufk/flink-faker#adding-flink-faker-to-flink-sql-client). + +Once you have done that, create a table using this table source to confirm that the factory is loaded by executing the following query in the SQL client: + +```sql +CREATE TABLE test WITH ('connector' = 'faker'); +``` + +If you see `[INFO] Execute statement succeed.`, then the table source has been loaded correctly. + +You are ready to start writing your first Flink program with SQL. + + +## Step 3: Consume the data via SQL + +For this tutorial, you are going to create a table that models a [Twitch](https://www.twitch.tv) gaming stream. This table will contain the following fields: user_name, game_name,viewer_count, started_at, location, and a timestamp. + +Use the [DDL syntax](https://www.geeksforgeeks.org/sql-ddl-dql-dml-dcl-tcl-commands/) `CREATE TABLE` to create this table containing these fields. You will also use the `WITH` clause to configure the connector (i.e. flink-faker). + +Execute the following query in the SQL client: + +```sql +CREATE TEMPORARY TABLE twitch_stream ( + `user_name` VARCHAR(2147483647), + `game_name` VARCHAR(2147483647), + `viewer_count` INT, + `started_at` TIMESTAMP_LTZ(3), + `location` VARCHAR(2147483647), + proctime AS PROCTIME() +) WITH ( + 'connector' = 'faker', + 'fields.user_name.expression' = '#{name.fullName}', + 'fields.game_name.expression' = '#{book.title}', + 'fields.viewer_count.expression' = '#{number.numberBetween ''0'',''100''}', + 'fields.started_at.expression' = '#{date.past ''15'',''SECONDS''}', + 'fields.location.expression' = '#{country.name}' +); +``` + +You should hopefully see `[INFO] Execute statement succeed.` + +Note that a temporary table does not have a catalog configured and will just be available in this session. + +Let's delve a bit into the notion of time. Within data processing systems, there are typically two types of time to reason about: event time (the time at which events actually occurred) and processing time (the time at whch events are observed in the system). The latter is the [simplest notion of time]({{< ref "docs/dev/table/concepts/time_attributes#processing-time" >}}) and is what you will be using via `PROCTIME()`. + + +## Step 4: Run your first continuous SQL query and learn about dynamic tables + +Now use the DQL syntax `SELECT` to view the streaming data in this table: + +```sql +SELECT * FROM twitch_stream; +``` + +You should see, on the console of the SQL client, a stream of data populating each of the defined fields of the twitch_stream table. Notice that this table contains a stream of data and is what is known as a [dynamic table]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}). + +Dynamic tables are the fundamental concept behind Flink's SQL support for streaming data. While SQL makes it seem like you are querying a database (or static tables that represent batch data, the tables in Flink are actually dynamic tables that are defined by queries. So instead of running several different queries on the same set of data, you are continuously running one query on a dataset that keeps changing. + +Under the hood, the SQL client submits queries to Flink's JobManager, which works with the TaskManager(s) to assign and monitor query tasks. Have a look at [Flink's architecture]({{< ref "docs/concepts/flink-architecture" >}}) for more detail. + + +## Step 5: Filter the data + +Now try to perform a filter operation on this data stream to specify a subset of the data by using the `WHERE` keyword. + +To find out what gaming streams started in the last 15 minutes, try this query: + +```sql +SELECT game_name, + started_at, + proctime, + TIMESTAMPDIFF(MINUTE, started_at, proctime) +FROM twitch_stream +WHERE TIMESTAMPDIFF(MINUTE, started_at, proctime) < 15; +``` + +You should now see a new table with new datastream results. + + +Now try a [Top-N]({{< ref "docs/dev/table/sql/queries/topn" >}}) query to find the 10 most popular games. + +Top-N queries identify the N smallest or largest values (as ordered by some attribute of the table), and are useful when you need to identify the top (or bottom) N items in a stream. + +```sql +SELECT game_name, proctime, viewer_count +FROM ( + SELECT *, ROW_NUMBER() OVER (ORDER BY viewer_count DESC) AS ranking + FROM ( + SELECT * + FROM ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY game_name ORDER BY proctime DESC) AS ordering + FROM twitch_stream + ) WHERE ordering = 1 + ) +) WHERE ranking <= 10; +``` + +Note that Flink uses the combination of an OVER window clause and a filter condition to express a Top-N query in order to filter through unbounded dataset. + +You should now see a new dynamic table with the results from this query. Notice how the top 10 games are constantly being revised as new data is processed. + + +## Step 6: Aggregate the data and learn about windowing + +Now try an aggregate function (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`) to find the most popular games based on location with the following query: + +```sql +SELECT location, + game_name, + COUNT(game_name) +FROM twitch_stream +WHERE location IS NOT NULL +GROUP BY location, game_name; +``` + +You should see a table result of continously changing data that tells you the most popular games for each country and the count for each game. + +For the last example, let's try a very commonly used function in streaming SQL: computing windowed aggregations. + +Windows are the fundamental building blocks when it comes to processing infinite streams since they provide a way to split streams into finite chunks. +Flink SQL offers several [window functions]({{< ref "docs/dev/table/sql/queries/window-tvf" >}}) that you can use. + +Let's explore how to apply one of these functions! To calculate an aggregation per game of the total viewer count for every 1 minute window, +you can use the [tumble]({{< ref "docs/dev/table/sql/queries/window-tvf#tumble" >}}) function. + +Tumbling windows can be thought of as mini-batches of aggregations over a non-overlapping window of time. + +```sql +SELECT window_start, + window_end, + game_name, + SUM(viewer_count) AS TotalViewerCount + FROM TABLE( + TUMBLE(TABLE twitch_stream, DESCRIPTOR(proctime), INTERVAL '1' MINUTE)) + GROUP BY window_start, window_end, game_name; +``` + +Fresh results will appear every minute, showing the sum of the viewer counts for each game. + + +## Step 7: Write the updated stream to persistant storage + +Now that you have written a program with Flink SQL to process all that streaming data, you probably want to store it somewhere. Since you are dealing with unbounded data sets, you can think of it as maintaining materialized views (or snapshots of the data) in external storage systems. + +Flink does not provide its own storage system but instead offers many sink connectors you can use to write table updates to external systems. + +To write a table to a CSV file, you can use the [FileSystem connector]({{< ref "docs/connectors/table/filesystem" >}}) that is built into Flink. This connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write one row at a time to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage-and-scan-efficient format before writing out the data. + +To store the results of the last example query (tumble window), you need to create a table that will store the results: + + +```sql +CREATE TABLE twitch_stream_viewer_count ( + `window_start` TIMESTAMP_LTZ(3), + `window_end` TIMESTAMP_LTZ(3), + `game_name` VARCHAR(2147483647), + `TotalViewerCount` INT +) WITH ( + 'connector' = 'filesystem', + 'path' = 'file:///path/to/directory/results.csv', + 'format' = 'csv' +) +``` + +Now insert the streaming SQL results into the file system table: + +```sql +INSERT INTO twitch_stream_viewer_count +SELECT + `window_start` TIMESTAMP_LTZ(3), + `window_end` TIMESTAMP_LTZ(3), + `game_name` VARCHAR(2147483647), + `TotalViewerCount` INT +FROM twitch_stream; Review comment: Perhaps leave it this way and then revisit it once we have that feature available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
