infoverload commented on a change in pull request #18055:
URL: https://github.com/apache/flink/pull/18055#discussion_r765846110



##########
File path: docs/content/docs/try-flink/write_flink_program_with_sql.md
##########
@@ -0,0 +1,264 @@
+---
+title: 'Write your first Flink program with SQL'
+weight: 2 
+type: docs
+aliases:
+  - /try-flink/write_flink_program_with_sql.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Write your first Flink program with SQL
+
+## Introduction
+
+Flink features [multiple APIs]({{< ref "docs/concepts/overview" >}}) with 
different levels of abstraction that can be used to develop your streaming 
application. SQL is the highest level of abstraction and is supported by Flink 
as a relational API for batch and stream processing. This means that you can 
write the same queries on both unbounded real-time streams and bounded recorded 
streams and produce the same results. 
+
+SQL on Flink is based on [Apache Calcite](https://calcite.apache.org/) (which 
is based on standard SQL) and is commonly used to ease the process of 
implementing data analytics, data pipelining, and ETL applications.  It is a 
great entryway to writing your first Flink application and requires no Java or 
Python. 
+
+This tutorial will guide you through writing your first Flink program 
leveraging SQL alone. Through this exercise you will learn and understand the 
ease and speed with which you can analyze streaming data in Flink! 
+
+
+## Goals
+
+This tutorial will teach you how to:
+
+- use the Flink SQL client to submit queries 
+- consume a data source with Flink SQL
+- run a continuous query on a stream of data
+- use Flink SQL to write out results to persistent storage 
+
+
+## Prerequisites 
+
+You only need to have basic knowledge of SQL to follow along.
+
+
+## Step 1: Start the Flink SQL client 
+
+The [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}) is bundled in the 
regular Flink distribution and can be run out-of-the-box. It requires only a 
running Flink cluster where table programs can be executed (since Flink SQL is 
a thin abstraction over the Table API). 
+
+There are many ways to set up Flink but you will run it locally for the 
purpose of this tutorial. [Download Flink]({{< ref 
"docs/try-flink/local_installation#downloading-flink" >}}) and [start a local 
cluster]({{< ref 
"docs/try-flink/local_installation#starting-and-stopping-a-local-cluster" >}}) 
with one worker (the TaskManager).  
+
+The scripts for the SQL client are located in the `/bin` directory of Flink. 
You can start the client by executing:
+
+```sh
+./bin/sql-client.sh
+```
+
+You should see something like this:
+
+{{< img src="/fig/try-flink/flink-sql.png" alt="Flink SQL client" >}}
+
+
+## Step 2: Set up a data source with flink-faker
+
+Like with any Flink program, you will need a data source to connect to so that 
Flink can process it. There are many popular data sources but for the interest 
of this tutorial, you will be using 
[flink-faker](https://github.com/knaufk/flink-faker).  This custom [table 
source]({{< ref "docs/connectors/table/overview" >}}) is based on [Java 
Faker](https://github.com/DiUS/java-faker) and can generate fake data 
continuously in memory and in a realistic format. 
+
+Java Faker is a tool for generating this data and flink-faker exposes that as 
a source in Flink by implementing the [DynamicTableSource 
interface](https://nightlies.apache.org/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html).
 The dynamic table source has the logic of how to create a table source (in 
this case, from flink-faker), and then by adding a 
[factory](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableSourceFactory.html)
 for it you can expose it in the SQL API by referencing it with `"connector" = 
"faker"`.
+
+The next step is to make `flink-faker` discoverable by the Flink SQL client by 
following these 
[instructions](https://github.com/knaufk/flink-faker#adding-flink-faker-to-flink-sql-client).
+
+Once you have done that, create a table using this table source to confirm 
that the factory is loaded by executing the following query in the SQL client: 
+
+```sql
+CREATE TABLE test WITH ('connector' = 'faker');
+```
+
+If you see `[INFO] Execute statement succeed.`, then the table source has been 
loaded correctly.
+
+You are ready to start writing your first Flink program with SQL. 
+
+
+## Step 3: Consume the data via SQL
+
+For this tutorial, you are going to create a table that models a 
[Twitch](https://www.twitch.tv) gaming stream. This table will contain the 
following fields: user_name, game_name,viewer_count, started_at, location, and 
a timestamp.
+
+Use the [DDL 
syntax](https://www.geeksforgeeks.org/sql-ddl-dql-dml-dcl-tcl-commands/) 
`CREATE TABLE` to create this table containing these fields. You will also use 
the `WITH` clause to configure the connector (i.e. flink-faker). 
+
+Execute the following query in the SQL client:
+
+```sql
+CREATE TEMPORARY TABLE twitch_stream (
+  `user_name` VARCHAR(2147483647),
+  `game_name` VARCHAR(2147483647),
+  `viewer_count` INT,
+  `started_at` TIMESTAMP_LTZ(3),
+  `location` VARCHAR(2147483647),
+  proctime AS PROCTIME()
+) WITH (
+  'connector' = 'faker',
+  'fields.user_name.expression' = '#{name.fullName}',
+  'fields.game_name.expression' = '#{book.title}',
+  'fields.viewer_count.expression' = '#{number.numberBetween ''0'',''100''}',
+  'fields.started_at.expression' = '#{date.past ''15'',''SECONDS''}',
+  'fields.location.expression' = '#{country.name}'
+);
+```
+
+You should hopefully see `[INFO] Execute statement succeed.`
+
+Note that a temporary table does not have a catalog configured and will just 
be available in this session. 
+
+Let's delve a bit into the notion of time. Within data processing systems, 
there are typically two types of time to reason about: event time (the time at 
which events actually occurred) and processing time (the time at whch events 
are observed in the system). The latter is the [simplest notion of time]({{< 
ref "docs/dev/table/concepts/time_attributes#processing-time" >}}) and is what 
you will be using via `PROCTIME()`.
+
+
+## Step 4: Run your first continuous SQL query and learn about dynamic tables
+
+Now use the DQL syntax `SELECT` to view the streaming data in this table:
+
+```sql
+SELECT * FROM twitch_stream;
+```
+
+You should see, on the console of the SQL client, a stream of data populating 
each of the defined fields of the twitch_stream table.  Notice that this table 
contains a stream of data and is what is known as a [dynamic table]({{< ref 
"docs/dev/table/concepts/dynamic_tables" >}}).  
+
+Dynamic tables are the fundamental concept behind Flink's SQL support for 
streaming data. While SQL makes it seem like you are querying a database (or 
static tables that represent batch data, the tables in Flink are actually 
dynamic tables that are defined by queries. So instead of running several 
different queries on the same set of data, you are continuously running one 
query on a dataset that keeps changing. 
+
+Under the hood, the SQL client submits queries to Flink's JobManager, which 
works with the TaskManager(s) to assign and monitor query tasks. Have a look at 
[Flink's architecture]({{< ref "docs/concepts/flink-architecture" >}}) for more 
detail. 
+
+
+## Step 5: Filter the data
+
+Now try to perform a filter operation on this data stream to specify a subset 
of the data by using the `WHERE` keyword.  
+
+To find out what gaming streams started in the last 15 minutes, try this query:
+
+```sql
+SELECT game_name, 
+       started_at,
+       proctime, 
+       TIMESTAMPDIFF(MINUTE, started_at, proctime)
+FROM twitch_stream
+WHERE TIMESTAMPDIFF(MINUTE, started_at, proctime) < 15;
+```
+
+You should now see a new table with new datastream results.  
+
+
+Now try a [Top-N]({{< ref "docs/dev/table/sql/queries/topn" >}}) query to find 
the 10 most popular games.
+
+Top-N queries identify the N smallest or largest values (as ordered by some 
attribute of the table), and are useful when you need to identify the top (or 
bottom) N items in a stream. 
+
+```sql
+SELECT game_name, proctime, viewer_count
+FROM (
+    SELECT *, ROW_NUMBER() OVER (ORDER BY viewer_count DESC) AS ranking
+    FROM (
+        SELECT *
+        FROM (
+            SELECT 
+                *,
+                ROW_NUMBER() OVER (PARTITION BY game_name ORDER BY proctime 
DESC) AS ordering
+            FROM twitch_stream
+        ) WHERE ordering = 1
+    )
+) WHERE ranking <= 10;
+```
+
+Note that Flink uses the combination of an OVER window clause and a filter 
condition to express a Top-N query in order to filter through unbounded 
dataset. 
+
+You should now see a new dynamic table with the results from this query.  
Notice how the top 10 games are constantly being revised as new data is 
processed.
+
+
+## Step 6: Aggregate the data and learn about windowing 
+
+Now try an aggregate function (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`) to find 
the most popular games based on location with the following query: 
+
+```sql
+SELECT location,
+       game_name,
+       COUNT(game_name) 
+FROM twitch_stream
+WHERE location IS NOT NULL
+GROUP BY location, game_name;
+```
+
+You should see a table result of continously changing data that tells you the 
most popular games for each country and the count for each game. 
+
+For the last example, let's try a very commonly used function in streaming 
SQL: computing windowed aggregations.
+
+Windows are the fundamental building blocks when it comes to processing 
infinite streams since they provide a way to split streams into finite chunks. 
+Flink SQL offers several [window functions]({{< ref 
"docs/dev/table/sql/queries/window-tvf" >}}) that you can use.  
+
+Let's explore how to apply one of these functions! To calculate an aggregation 
per game of the total viewer count for every 1 minute window, 
+you can use the [tumble]({{< ref 
"docs/dev/table/sql/queries/window-tvf#tumble" >}}) function.
+
+Tumbling windows can be thought of as mini-batches of aggregations over a 
non-overlapping window of time.
+
+```sql
+SELECT window_start,
+       window_end,
+       game_name,
+       SUM(viewer_count) AS TotalViewerCount
+  FROM TABLE(
+    TUMBLE(TABLE twitch_stream, DESCRIPTOR(proctime), INTERVAL '1' MINUTE))
+  GROUP BY window_start, window_end, game_name;
+```
+
+Fresh results will appear every minute, showing the sum of the viewer counts 
for each game.
+
+
+## Step 7: Write the updated stream to persistant storage
+
+Now that you have written a program with Flink SQL to process all that 
streaming data, you probably want to store it somewhere. Since you are dealing 
with unbounded data sets, you can think of it as maintaining materialized views 
(or snapshots of the data) in external storage systems.
+
+Flink does not provide its own storage system but instead offers many sink 
connectors you can use to write table updates to external systems.
+
+To write a table to a CSV file, you can use the [FileSystem connector]({{< ref 
"docs/connectors/table/filesystem" >}}) that is built into Flink. This 
connector supports row-encoded and bulk-encoded formats. Row-encoded formats 
such as CSV and JSON write one row at a time to a file. Bulk-encoded formats 
collect a batch of rows in memory and organize them in a 
storage-and-scan-efficient format before writing out the data. 
+
+To store the results of the last example query (tumble window), you need to 
create a table that will store the results:
+
+
+```sql
+CREATE TABLE twitch_stream_viewer_count (
+  `window_start` TIMESTAMP_LTZ(3),
+  `window_end` TIMESTAMP_LTZ(3),
+  `game_name` VARCHAR(2147483647),
+  `TotalViewerCount` INT
+) WITH (
+  'connector' = 'filesystem',
+  'path' = 'file:///path/to/directory/results.csv',
+  'format' = 'csv'
+)
+```
+
+Now insert the streaming SQL results into the file system table:
+
+```sql
+INSERT INTO twitch_stream_viewer_count 
+SELECT 
+  TBD
+FROM twitch_stream;
+```

Review comment:
       @alpinegizmo Can you help me with the insert query into the sink?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to