twalthr commented on a change in pull request #460:
URL: https://github.com/apache/flink-web/pull/460#discussion_r690995064



##########
File path: README.md
##########
@@ -1,29 +1,34 @@
-# flink-web
+# Flink Web

Review comment:
       the repository is called `flink-web`, I would keep this

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  

Review comment:
       > without modification to the underlying data
   
   for sinks your are modifying data, maybe drop `and a sink`

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 

Review comment:
       link to the IMAP wikipedia article?

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.

Review comment:
       ```suggestion
   1. A _factory class_ (a blueprint for creating other objects from string 
properties) that tells Flink with which identifier (in this case, “imap”) our 
connector can be addressed, which configuration options it exposes, and how the 
connector can be instantiated. Since Flink uses the Java Service Provider 
Interface (SPI) to discover factories located in different modules, you will 
also need to add some configuration details.
   ```

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   

Review comment:
       ```suggestion
   Apache Flink is a data processing engine that aims to keep 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations efficiently. However, Flink does not "own" 
the data but relies on external systems to ingest and persist data. Connecting 
to external data input (**sources**) and external data storage (**sinks**) is 
usually summarized under the term **connectors** in Flink.   
   ```

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.

Review comment:
       nit: double space

##########
File path: README.md
##########
@@ -1,29 +1,34 @@
-# flink-web
+# Flink Web
 
 This repository contains the Flink website: https://flink.apache.org/.
 
-You find instructions for this repository here: 
https://flink.apache.org/contributing/improve-website.html.
+You can find instructions for contributing to this repository here: 
https://flink.apache.org/contributing/improve-website.html.
 
 ## Testing changes locally
 
-You can build the website using Docker such as below (without augmenting your 
host environment). Parameters passed as 
-part of this call will be forwarded to `build.sh`. The `-i` option can be used 
to enable incremental builds.
+You can build the website using Docker (without changing your host 
environment) by using the provided script as shown below. 
+Parameters passed as part of this call will be forwarded to `build.sh`. The 
`-i` option can be used to enable incremental builds.
+
 ```bash
-# starts website with future post being disabled
-bash docker-build.sh -p
+# starts website with future posts disabled
+docker-build.sh -p

Review comment:
       `bash` had actually a purpose, because the command itself is not 
runnable. but we can achieve the same by calling `./docker-build.sh -p`

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   

Review comment:
       I found the previous wording very negative.

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime 
system, defining how data goes in and how it can be executed in the cluster. 
There are a few different interfaces available for implementing the actual 
source of the data and have it be discoverable in Flink.  
+
+For complex connectors, you may want to implement the [Source 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
 which gives you a lot of control. For simpler use cases, you can use the 
[SourceFunction 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base interface for all stream data sources in Flink. There are 
already a few different implementations of SourceFunction interfaces for common 
use cases such as the 
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
 class and the 
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
 class. You will use the latter.  
+
+`RichSourceFunction` is a base class for implementing a parallel data source 
that has access to context information and some lifecycle methods. There is a 
`run()` method inherited from the `SourceFunction` interface that you need to 
implement. It is invoked once and can be used to produce the data either once 
for a bounded result or within a loop for an unbounded stream.
+
+For example, to create a bounded data source, you could implement this method 
so that it reads all existing emails and then closes. To create an unbounded 
source, you could only look at new emails coming in while the source is active. 
You can also combine these behaviors and expose them through configuration 
options.
+
+When you first create the class and implement the interface, it should look 
something like this:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {}
+
+  @Override
+  public void cancel() {}
+}
+```
+
+In the `run()` method, you get access to a 
[context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html)
 object inherited from the SourceFunction interface, which is a bridge to Flink 
and allows you to output data. Since the source does not produce any data yet, 
the next step is to make it produce some static data in order to test that the 
data flows correctly: 
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {
+      ctx.collect(GenericRowData.of(
+          StringData.fromString("Subject 1"), 
+          StringData.fromString("Hello, World!")
+      ));
+  }
+
+  @Override
+  public void cancel() {}
+}
+```
+
+// explain the collect function?
+
+You do not need to implement the `cancel()` method yet because the source 
finishes instantly. 
+
+
+# Create and configure a dynamic table source for the data stream
+
+[Dynamic 
tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/)
 are the core concept of Flink’s Table API and SQL support for streaming data 
and, like its name suggests, change over time. You can imagine a data stream 
being logically converted into a table that is constantly changing. For this 
tutorial, the emails that will be read in will be interpreted as a (source) 
table that is queryable. It can be viewed as a specific instance of a connector 
class. 
+
+You will now implement a 
[DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html)
 interface. There are two types of dynamic table sources: 
[ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html)
 and 
[LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html).
 Scan sources read the entire table on the external system while lookup sources 
look for specific rows based on keys. The former will fit the use case of this 
tutorial. 
+
+This is what a scan table source implementation would look like:
+
+```java
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+
+public class ImapTableSource implements ScanTableSource {
+  @Override
+  public ChangelogMode getChangelogMode() {
+    return ChangelogMode.insertOnly();
+  }
+
+  @Override
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+    boolean bounded = true;
+    final ImapSourceFunction sourceFunction = new ImapSourceFunction();
+    return SourceFunctionProvider.of(sourceFunction, bounded);
+  }
+
+  @Override
+  public DynamicTableSource copy() {
+    return new ImapTableSource();
+  }
+
+  @Override
+  public String asSummaryString() {
+    return "IMAP Table Source";
+  }
+}
+```
+
+[ChangelogMode](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/ChangelogMode.html)
 informs Flink of expected changes that the planner can expect during runtime. 
For example, whether the source produces only new rows, also updates to 
existing ones, or whether it can remove previously produced rows. Our source 
will only produce (`insertOnly()`) new rows.
+
+[ScanRuntimeProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.ScanRuntimeProvider.html)
 allows Flink to create the actual runtime implementation you established 
previously (for reading the data). Flink even provides utilities like 
[SourceFunctionProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/SourceFunctionProvider.html)
 to wrap it into an instance of 
[SourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base unit of all stream data sources in Flink.  

Review comment:
       > which is the base unit of all stream data sources in Flink
   
   not true, better "one of the base runtime interfaces"

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.

Review comment:
       mention that is responsible for  back and forth communicating with the 
optimizer during planning and is kind of a another factory for creating 
connector runtime implementation

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime 
system, defining how data goes in and how it can be executed in the cluster. 
There are a few different interfaces available for implementing the actual 
source of the data and have it be discoverable in Flink.  
+
+For complex connectors, you may want to implement the [Source 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
 which gives you a lot of control. For simpler use cases, you can use the 
[SourceFunction 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base interface for all stream data sources in Flink. There are 
already a few different implementations of SourceFunction interfaces for common 
use cases such as the 
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
 class and the 
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
 class. You will use the latter.  

Review comment:
       for clarity, `Source` interface is the new abstraction whereas 
`SourceFunction` is the old. all connectors will implement `Source` eventually 
but currently more simplification is needed to expose it to users.
   
   > which is the base interface for all stream data sources in Flink
   
   this is not true, Kafka and Filesystem are already using the new abstraction

##########
File path: _posts/2021-08-16-connector-table-sql-api-part2.md
##########
@@ -0,0 +1,434 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
Two "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+In [part one](#) of this tutorial, you learned how to build a custom source 
connector for Flink. In part two, you will learn how to integrate the connector 
with a test email inbox through the IMAP protocol, filter out emails, and 
execute [Flink SQL on the Ververica 
Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform). 
+
+# Goals
+
+Part two of the tutorial will teach you how to: 
+
+- integrate a source connector which connects to a mailbox using the IMAP 
protocol
+- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library 
that can send and receive email via the IMAP protocol  
+- write [Flink 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/)
 and execute the queries in the Ververica Platform
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have:
+
+- followed the steps outlined in [part one](#) of this tutorial
+- some familiarity with Java and objected-oriented programming
+
+
+# Understand how to fetch emails via the IMAP protocol
+
+Now that you have a working source connector that can run on Flink, it is time 
to connect to an email server via IMAP (an Internet protocol that allows email 
clients to retrieve messages from a mail server) so that Flink can process 
emails instead of test static data.  
+
+You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java 
library that can be used to send and receive email via IMAP. For simplicity, 
authentication will use a plain username and password.
+
+This tutorial will focus more on how to implement a connector for Flink. If 
you want to learn more about the details of how IMAP or Jakarta Mail work, you 
are encouraged to explore a more extensive implementation at this 
[repository](github.com/Airblader/flink-connector-email). 
+
+In order to fetch emails, you will need to connect to the email server, 
register a listener for new emails and collect them whenever they arrive, and 
enter a loop to keep the connector running. 
+
+
+# Add configuration options - server information and credentials
+
+In order to connect to your IMAP server, you will need at least the following:
+
+- hostname (of the mail server)
+- port number
+- username
+- password
+
+You will start by creating a class to encapsulate the configuration options. 
You will make use of [Lombok](https://projectlombok.org/setup/maven) to help 
with some boilerplate code. By adding the `@Data` and `@Builder` annotations, 
Lombok will generate these for all the fields of the immutable class. 
+
+```java
+@Data
+@Builder
+public class ImapSourceOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String host;
+    private final Integer port;
+    private final String user;
+    private final String password;
+}
+```
+
+Now you can add an instance of this class to the `ImapSourceFunction` and 
`ImapTableSource` classes so it can be used there. Take note of the column 
names with which the table has been created. This will help later.
+
+// QUESTION: what would the column names be here??
+
+```java
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapSourceFunction(
+        ImapSourceOptions options, 
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames.stream()
+            .map(String::toUpperCase)
+            .collect(Collectors.toList());
+    }
+
+    // ...
+}
+```
+
+```java
+public class ImapTableSource implements ScanTableSource {
+
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapTableSource(
+        ImapSourceOptions options,
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames;
+    }
+
+    // …
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+        final ImapSourceFunction sourceFunction = new 
ImapSourceFunction(options, columnNames);
+        return SourceFunctionProvider.of(sourceFunction, true);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new ImapTableSource(options, columnNames);
+    }
+
+    // …
+}
+```
+
+Finally, in the `ImapTableSourceFactory` class, you need to create a 
`ConfigOption<Type>Name` for the hostname, port number, username, and password. 
 Then you need to report them to Flink. Since all of the current options are 
mandatory, you can add them to the `requiredOptions()` method in order to do 
this. 
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+    public static final ConfigOption<String> HOST = 
ConfigOptions.key("host").stringType().noDefaultValue();
+    public static final ConfigOption<Integer> PORT = 
ConfigOptions.key("port").intType().noDefaultValue();
+    public static final ConfigOption<String> USER = 
ConfigOptions.key("user").stringType().noDefaultValue();
+    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue();
+
+    // …
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOST);
+        options.add(PORT);
+        options.add(USER);
+        options.add(PASSWORD);
+        return options;
+    }
+
+    // …
+}
+```
+
+Now take a look at the `createDynamicTableSource()` function in the 
`ImapTableSouceFactory` class.  Recall that previously (in part one) you had 
created a small helper utility 
[TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html),
 that Flink offers which ensures that required options are set and that no 
unknown options are provided. You can now use it to automatically make sure 
that the required options of hostname, port number, username, and password are 
all provided when creating a table using this connector. The helper function 
will throw an error message if one required option is missing. You can also use 
it to access the provided options (`getOptions()`), convert them into an 
instance of the `ImapTableSource` class created earlier, and provide the 
instance to the table source:
+
+// why would you want to do the latter??
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+    // ...
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context ctx) {
+        final FactoryUtil.TableFactoryHelper factoryHelper = 
FactoryUtil.createTableFactoryHelper(this, ctx);
+        factoryHelper.validate();
+
+        final ImapSourceOptions options = ImapSourceOptions.builder()
+            .host(factoryHelper.getOptions().get(HOST))
+            .port(factoryHelper.getOptions().get(PORT))
+            .user(factoryHelper.getOptions().get(USER))
+            .password(factoryHelper.getOptions().get(PASSWORD))
+            .build();
+        final List<String> columnNames = 
ctx.getCatalogTable().getResolvedSchema().getColumnNames();
+        return new ImapTableSource(options, columnNames);
+    }
+}
+```
+
+To test these new configuration options, run:
+
+```sh
+$ cd testing/
+$ ./build_and_run.sh
+```
+
+Once you see the Flink SQL client start up, execute the following statements 
to create a table with your connector:
+
+```sql
+CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap');
+
+SELECT * FROM T;
+```
+
+This time it will fail because the required options are not provided.  
+
+```
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.api.ValidationException: One or more required options 
are missing.
+
+Missing required options are:
+
+host
+password
+user
+``` 
+
+
+#  Connect to the source email server
+
+Now that you have configured the required options to connect to the email 
server, it is time to actually connect to the server. 
+
+Going back to the `ImapSourceFunction` class, you first need to convert the 
options given to the table source into a `Properties` object, which is what you 
can pass to the Jakarta library. You can also set various other properties here 
as well (i.e. enabling SSL).
+
+// is there more information on this properties object??
+
+```java
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+   // …
+
+   private Properties getSessionProperties() {
+        Properties props = new Properties();
+        props.put("mail.store.protocol", "imap");
+        props.put("mail.imap.auth", true);
+        props.put("mail.imap.host", options.getHost());
+        if (options.getPort() != null) {
+            props.put("mail.imap.port", options.getPort());
+        }
+
+        return props;
+    }
+}
+```
+
+Now create a method (`connect()`) which sets up the connection:
+
+```java 
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+    // …
+
+    private transient Store store;
+    private transient IMAPFolder folder;
+
+    private void connect() throws Exception {
+        var session = Session.getInstance(getSessionProperties(), null);
+        store = session.getStore();
+        store.connect(options.getUser(), options.getPassword());
+
+        var genericFolder = store.getFolder("INBOX");
+        folder = (IMAPFolder) genericFolder;
+
+        if (!folder.isOpen()) {
+            folder.open(Folder.READ_ONLY);
+        }
+    }
+}
+```
+
+You can now use this method to connect to the mail server when the source is 
created. Create a loop to keep the source running while collecting email 
counts. Lastly, implement methods to cancel and close the connection:
+
+```java
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+    private transient volatile boolean running = false;
+
+    // …
+
+    @Override
+    public void run(SourceFunction.SourceContext<RowData> ctx) throws 
Exception {
+        connect();
+        running = true;
+
+        // TODO: Listen for new messages
+
+        while (running) {

Review comment:
       mention that our source is not checkpointable, so no state fault 
tolerance will be possible

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.

Review comment:
       +1

##########
File path: README.md
##########
@@ -1,29 +1,34 @@
-# flink-web
+# Flink Web
 
 This repository contains the Flink website: https://flink.apache.org/.
 
-You find instructions for this repository here: 
https://flink.apache.org/contributing/improve-website.html.
+You can find instructions for contributing to this repository here: 
https://flink.apache.org/contributing/improve-website.html.
 
 ## Testing changes locally
 
-You can build the website using Docker such as below (without augmenting your 
host environment). Parameters passed as 
-part of this call will be forwarded to `build.sh`. The `-i` option can be used 
to enable incremental builds.
+You can build the website using Docker (without changing your host 
environment) by using the provided script as shown below. 
+Parameters passed as part of this call will be forwarded to `build.sh`. The 
`-i` option can be used to enable incremental builds.
+
 ```bash
-# starts website with future post being disabled
-bash docker-build.sh -p
+# starts website with future posts disabled
+docker-build.sh -p
 
-# starts website including also future posts
-bash docker-build.sh -f
+# starts website including future posts
+docker-build.sh -f
 ```
 
-Both commands will start a webserver providing the website via 
`http://0.0.0.0:4000`.
+Both commands will start a web server hosting the website via 
`http://0.0.0.0:4000`.
+
+If a newly added blog post does not show up in the blogs overview page, build 
the website again without the `-i` option.
+You can also try deleting the "/content" directory before building the page 
locally. The entire "/content" directory will be
+regenerated and include the newly added blog post.
+
+## Building the website
 
-If a newly added blog post is not showing up on the index / blog overview 
page, build the website again without the `-i` option, or delete the "content" 
directory before building the page locally. The "content" directory will be 
regenerated completely, including the newly added blog post.
+The website needs to be rebuilt before being merged into the `asf-site` 
branch. When doing so, please *do not* use incremental builds. 
 
-## Building website
+You can execute the following command to rebuild non-incrementally:
 
-The site needs to be rebuild before merging into the branch asf-site.
-When doing so, DO NOT use incremental builds.
 ```bash
-bash docker-build.sh
+docker-build.sh

Review comment:
       usually, we split PRs into several commits. the changes to the readme 
have actually nothing to do with the blogpost itself and can be a separate 
`[hotfix]` commit.

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "

Review comment:
       Should we do tile style `Implementing a Custom Source Connector for 
Table API and SQL`?

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime 
system, defining how data goes in and how it can be executed in the cluster. 
There are a few different interfaces available for implementing the actual 
source of the data and have it be discoverable in Flink.  
+
+For complex connectors, you may want to implement the [Source 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
 which gives you a lot of control. For simpler use cases, you can use the 
[SourceFunction 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base interface for all stream data sources in Flink. There are 
already a few different implementations of SourceFunction interfaces for common 
use cases such as the 
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
 class and the 
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
 class. You will use the latter.  
+
+`RichSourceFunction` is a base class for implementing a parallel data source 
that has access to context information and some lifecycle methods. There is a 
`run()` method inherited from the `SourceFunction` interface that you need to 
implement. It is invoked once and can be used to produce the data either once 
for a bounded result or within a loop for an unbounded stream.
+
+For example, to create a bounded data source, you could implement this method 
so that it reads all existing emails and then closes. To create an unbounded 
source, you could only look at new emails coming in while the source is active. 
You can also combine these behaviors and expose them through configuration 
options.
+
+When you first create the class and implement the interface, it should look 
something like this:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {}
+
+  @Override
+  public void cancel() {}
+}
+```
+
+In the `run()` method, you get access to a 
[context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html)
 object inherited from the SourceFunction interface, which is a bridge to Flink 
and allows you to output data. Since the source does not produce any data yet, 
the next step is to make it produce some static data in order to test that the 
data flows correctly: 
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {
+      ctx.collect(GenericRowData.of(
+          StringData.fromString("Subject 1"), 
+          StringData.fromString("Hello, World!")
+      ));
+  }
+
+  @Override
+  public void cancel() {}
+}
+```
+
+// explain the collect function?
+
+You do not need to implement the `cancel()` method yet because the source 
finishes instantly. 
+
+
+# Create and configure a dynamic table source for the data stream
+
+[Dynamic 
tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/)
 are the core concept of Flink’s Table API and SQL support for streaming data 
and, like its name suggests, change over time. You can imagine a data stream 
being logically converted into a table that is constantly changing. For this 
tutorial, the emails that will be read in will be interpreted as a (source) 
table that is queryable. It can be viewed as a specific instance of a connector 
class. 
+
+You will now implement a 
[DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html)
 interface. There are two types of dynamic table sources: 
[ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html)
 and 
[LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html).
 Scan sources read the entire table on the external system while lookup sources 
look for specific rows based on keys. The former will fit the use case of this 
tutorial. 
+
+This is what a scan table source implementation would look like:
+
+```java
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+
+public class ImapTableSource implements ScanTableSource {
+  @Override
+  public ChangelogMode getChangelogMode() {
+    return ChangelogMode.insertOnly();
+  }
+
+  @Override
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+    boolean bounded = true;
+    final ImapSourceFunction sourceFunction = new ImapSourceFunction();
+    return SourceFunctionProvider.of(sourceFunction, bounded);
+  }
+
+  @Override
+  public DynamicTableSource copy() {
+    return new ImapTableSource();
+  }
+
+  @Override
+  public String asSummaryString() {
+    return "IMAP Table Source";
+  }
+}
+```
+
+[ChangelogMode](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/ChangelogMode.html)
 informs Flink of expected changes that the planner can expect during runtime. 
For example, whether the source produces only new rows, also updates to 
existing ones, or whether it can remove previously produced rows. Our source 
will only produce (`insertOnly()`) new rows.
+
+[ScanRuntimeProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.ScanRuntimeProvider.html)
 allows Flink to create the actual runtime implementation you established 
previously (for reading the data). Flink even provides utilities like 
[SourceFunctionProvider](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/SourceFunctionProvider.html)
 to wrap it into an instance of 
[SourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base unit of all stream data sources in Flink.  
+
+You will also need to indicate whether the source is bounded or not. 
Currently, this is the case but you will have to change this later. 
+
+# Create a factory class for the connector so it can be discovered by Flink
+
+You now have a working source connector, but in order to use it in Table API 
or SQL, it needs to be discoverable by Flink. You also need to define how the 
connector is addressable from a SQL statement when creating a source table. 
+
+You need to implement a 
[Factory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/Factory.html),
 which is a base interface that create object instances from a list of 
key-value pairs in Flink's Table API and SQL.  A factory is uniquely identified 
by its class name and `factoryIdentifier()`.  For this tutorial, you will 
implement the more specific 
[DynamicTableSourceFactory](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableSourceFactory.html),
 which allows you to configure a dynamic table connector as well as create 
`DynamicTableSource` instances.  
+
+```java
+import java.util.HashSet;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+  @Override
+  public String factoryIdentifier() {
+    return "imap";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return new HashSet<>();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return new HashSet<>();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context ctx) {
+    final FactoryUtil.TableFactoryHelper factoryHelper = 
FactoryUtil.createTableFactoryHelper(this, ctx);
+    factoryHelper.validate();
+
+    return new ImapTableSource();
+  }
+}
+```
+
+There are currently no configuration options but they can be added and also 
validated within the `createDynamicTableSource()` function. There is a small 
helper utility, 
[TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html),
 that Flink offers which ensures that required options are set and that no 
unknown options are provided.
+
+Finally, you need to register your factory for Java's Service Provider 
Interfaces (SPI). Classes that implement this interface can be discovered and 
should be added to this file 
`src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` 
with the fully classified class name of your factory:
+
+```java
+// if you created your class in the package org.example.acme, it should be 
named the following:
+org.example.acme.ImapTableSourceFactory
+```
+
+# Test the custom connector
+
+You should now have a working source connector. If you are following along 
with the provided repository, you can test it by running:
+
+```sh
+$ cd testing/
+$ ./build_and_run.sh
+```
+
+This builds the connector, starts a Flink cluster, a [test email 
server](https://greenmail-mail-test.github.io/greenmail/) which you will need 
later), and the [SQL 
client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
 for you. You may need to run this script as a super user. If successful, you 
should see the SQL CLI:
+
+![image](https://ci.apache.org/projects/flink/flink-docs-release-1.13/fig/sql_client_demo.gif)
+
+You can now create a table with your connector by executing the following 
statement with the SQL client:

Review comment:
       was there any link to docs and explanation what is SQL Client before?

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime 
system, defining how data goes in and how it can be executed in the cluster. 
There are a few different interfaces available for implementing the actual 
source of the data and have it be discoverable in Flink.  
+
+For complex connectors, you may want to implement the [Source 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
 which gives you a lot of control. For simpler use cases, you can use the 
[SourceFunction 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base interface for all stream data sources in Flink. There are 
already a few different implementations of SourceFunction interfaces for common 
use cases such as the 
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
 class and the 
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
 class. You will use the latter.  
+
+`RichSourceFunction` is a base class for implementing a parallel data source 
that has access to context information and some lifecycle methods. There is a 
`run()` method inherited from the `SourceFunction` interface that you need to 
implement. It is invoked once and can be used to produce the data either once 
for a bounded result or within a loop for an unbounded stream.
+
+For example, to create a bounded data source, you could implement this method 
so that it reads all existing emails and then closes. To create an unbounded 
source, you could only look at new emails coming in while the source is active. 
You can also combine these behaviors and expose them through configuration 
options.
+
+When you first create the class and implement the interface, it should look 
something like this:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {}
+
+  @Override
+  public void cancel() {}
+}
+```
+
+In the `run()` method, you get access to a 
[context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html)
 object inherited from the SourceFunction interface, which is a bridge to Flink 
and allows you to output data. Since the source does not produce any data yet, 
the next step is to make it produce some static data in order to test that the 
data flows correctly: 
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+  @Override
+  public void run(SourceContext<RowData> ctx) throws Exception {
+      ctx.collect(GenericRowData.of(
+          StringData.fromString("Subject 1"), 
+          StringData.fromString("Hello, World!")
+      ));
+  }
+
+  @Override
+  public void cancel() {}
+}
+```
+
+// explain the collect function?
+
+You do not need to implement the `cancel()` method yet because the source 
finishes instantly. 
+
+
+# Create and configure a dynamic table source for the data stream
+
+[Dynamic 
tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/)
 are the core concept of Flink’s Table API and SQL support for streaming data 
and, like its name suggests, change over time. You can imagine a data stream 
being logically converted into a table that is constantly changing. For this 
tutorial, the emails that will be read in will be interpreted as a (source) 
table that is queryable. It can be viewed as a specific instance of a connector 
class. 
+
+You will now implement a 
[DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html)
 interface. There are two types of dynamic table sources: 
[ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html)
 and 
[LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html).
 Scan sources read the entire table on the external system while lookup sources 
look for specific rows based on keys. The former will fit the use case of this 
tutorial. 
+
+This is what a scan table source implementation would look like:
+
+```java
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+
+public class ImapTableSource implements ScanTableSource {
+  @Override
+  public ChangelogMode getChangelogMode() {
+    return ChangelogMode.insertOnly();
+  }
+
+  @Override
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+    boolean bounded = true;

Review comment:
       for a tutorial a local variable makes the code more readable

##########
File path: _posts/2021-08-16-connector-table-sql-api-part2.md
##########
@@ -0,0 +1,434 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
Two "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+In [part one](#) of this tutorial, you learned how to build a custom source 
connector for Flink. In part two, you will learn how to integrate the connector 
with a test email inbox through the IMAP protocol, filter out emails, and 
execute [Flink SQL on the Ververica 
Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform). 
+
+# Goals
+
+Part two of the tutorial will teach you how to: 
+
+- integrate a source connector which connects to a mailbox using the IMAP 
protocol
+- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library 
that can send and receive email via the IMAP protocol  
+- write [Flink 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/)
 and execute the queries in the Ververica Platform
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have:
+
+- followed the steps outlined in [part one](#) of this tutorial
+- some familiarity with Java and objected-oriented programming
+
+
+# Understand how to fetch emails via the IMAP protocol
+
+Now that you have a working source connector that can run on Flink, it is time 
to connect to an email server via IMAP (an Internet protocol that allows email 
clients to retrieve messages from a mail server) so that Flink can process 
emails instead of test static data.  
+
+You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java 
library that can be used to send and receive email via IMAP. For simplicity, 
authentication will use a plain username and password.
+
+This tutorial will focus more on how to implement a connector for Flink. If 
you want to learn more about the details of how IMAP or Jakarta Mail work, you 
are encouraged to explore a more extensive implementation at this 
[repository](github.com/Airblader/flink-connector-email). 
+
+In order to fetch emails, you will need to connect to the email server, 
register a listener for new emails and collect them whenever they arrive, and 
enter a loop to keep the connector running. 
+
+
+# Add configuration options - server information and credentials
+
+In order to connect to your IMAP server, you will need at least the following:
+
+- hostname (of the mail server)
+- port number
+- username
+- password
+
+You will start by creating a class to encapsulate the configuration options. 
You will make use of [Lombok](https://projectlombok.org/setup/maven) to help 
with some boilerplate code. By adding the `@Data` and `@Builder` annotations, 
Lombok will generate these for all the fields of the immutable class. 
+
+```java
+@Data
+@Builder
+public class ImapSourceOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String host;
+    private final Integer port;
+    private final String user;
+    private final String password;
+}
+```
+
+Now you can add an instance of this class to the `ImapSourceFunction` and 
`ImapTableSource` classes so it can be used there. Take note of the column 
names with which the table has been created. This will help later.
+
+// QUESTION: what would the column names be here??
+
+```java
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapSourceFunction(
+        ImapSourceOptions options, 
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames.stream()
+            .map(String::toUpperCase)
+            .collect(Collectors.toList());
+    }
+
+    // ...
+}
+```
+
+```java
+public class ImapTableSource implements ScanTableSource {
+
+    private final ImapSourceOptions options;
+    private final List<String> columnNames;
+
+    public ImapTableSource(
+        ImapSourceOptions options,
+        List<String> columnNames
+    ) {
+        this.options = options;
+        this.columnNames = columnNames;
+    }
+
+    // …
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+        final ImapSourceFunction sourceFunction = new 
ImapSourceFunction(options, columnNames);
+        return SourceFunctionProvider.of(sourceFunction, true);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new ImapTableSource(options, columnNames);
+    }
+
+    // …
+}
+```
+
+Finally, in the `ImapTableSourceFactory` class, you need to create a 
`ConfigOption<Type>Name` for the hostname, port number, username, and password. 
 Then you need to report them to Flink. Since all of the current options are 
mandatory, you can add them to the `requiredOptions()` method in order to do 
this. 
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+    public static final ConfigOption<String> HOST = 
ConfigOptions.key("host").stringType().noDefaultValue();
+    public static final ConfigOption<Integer> PORT = 
ConfigOptions.key("port").intType().noDefaultValue();
+    public static final ConfigOption<String> USER = 
ConfigOptions.key("user").stringType().noDefaultValue();
+    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue();
+
+    // …
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOST);
+        options.add(PORT);
+        options.add(USER);
+        options.add(PASSWORD);
+        return options;
+    }
+
+    // …
+}
+```
+
+Now take a look at the `createDynamicTableSource()` function in the 
`ImapTableSouceFactory` class.  Recall that previously (in part one) you had 
created a small helper utility 
[TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html),
 that Flink offers which ensures that required options are set and that no 
unknown options are provided. You can now use it to automatically make sure 
that the required options of hostname, port number, username, and password are 
all provided when creating a table using this connector. The helper function 
will throw an error message if one required option is missing. You can also use 
it to access the provided options (`getOptions()`), convert them into an 
instance of the `ImapTableSource` class created earlier, and provide the 
instance to the table source:
+
+// why would you want to do the latter??
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+    // ...
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context ctx) {
+        final FactoryUtil.TableFactoryHelper factoryHelper = 
FactoryUtil.createTableFactoryHelper(this, ctx);
+        factoryHelper.validate();
+
+        final ImapSourceOptions options = ImapSourceOptions.builder()
+            .host(factoryHelper.getOptions().get(HOST))
+            .port(factoryHelper.getOptions().get(PORT))
+            .user(factoryHelper.getOptions().get(USER))
+            .password(factoryHelper.getOptions().get(PASSWORD))
+            .build();
+        final List<String> columnNames = 
ctx.getCatalogTable().getResolvedSchema().getColumnNames();

Review comment:
       actually we should use `toPhysicalDataType` to erase computed columns 
and time attributes, but the problem in this tutorial is actually that it mixes 
the concepts of metadata columns and physical columns. so it uses physical 
columns as a replacement for metadata columns. `(RowType) 
ctx.getCatalogTable().getResolvedSchema().toPhysicalRowDataType().getLogicalType().getFieldNames()`
 but this is not very nice for an example :(

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   

Review comment:
       ```suggestion
   Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [extension 
points](??????) for building custom connectors if you want to connect to a 
system that is not supported by an existing connector.   
   ```

##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title:  "Implementing a custom source connector for Table API and SQL - Part 
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+  name: "Ingo Buerk"
+excerpt: 
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps 
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
 locally in order to do computations but does not store data. This means that 
it does not include its own fault-tolerant storage component by default and 
relies on external systems to ingest and persist data. Connecting to external 
data input (**sources**) and external data storage (**sinks**) is achieved with 
interfaces called **connectors**.   
+
+Since connectors are such important components, Flink ships with [connectors 
for some popular 
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
 But sometimes you may need to read in an uncommon data format and what Flink 
provides is not enough. This is why Flink also provides [APIs](#) for building 
custom connectors if you want to connect to a system that is not supported by 
an existing connector.   
+
+Once you have a source and a sink defined for Flink, you can use its 
declarative APIs (in the form of the [Table API and 
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
 to execute queries for data analysis without modification to the underlying 
data.  
+
+The **Table API** has the same operations as **SQL** but it extends and 
improves SQL's functionality. It is named Table API because of its relational 
functions on tables: how to obtain a table, how to output a table, and how to 
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by 
implementing your own custom source connector for reading in data from a 
mailbox. You will use Flink to process an email inbox through the IMAP protocol 
and sort them out by subject to a sink. 
+
+Part one will focus on building a custom source connector and [part two](#) 
will focus on integrating it. 
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source 
connector to be used with Table API and SQL, two high-level abstractions in 
Flink.
+
+You are encouraged to follow along with the code in this 
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project 
that also comes with a bundled 
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily 
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and 
objected-oriented programming. 
+
+It would also be useful to have 
[docker-compose](https://docs.docker.com/compose/install/) installed on your 
system in order to use the script included in the repository that builds and 
runs the connector. 
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink 
with which identifier (in this case, “imap”) our connector can be addressed, 
which configuration options it exposes, and how the connector can be 
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to 
discover factories located in different modules, you will also need to add some 
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during 
the planning stage. It tells Flink some information about this instance and how 
it can create the connector runtime implementation. There are also more 
advanced features, such as 
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
 that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning 
stage. The runtime logic is implemented in Flink's core connector interfaces 
and does the actual work of producing rows of dynamic table data. The runtime 
instances are shipped to the Flink cluster. 
+
+Let us look at  this sequence (factory class → table source → runtime 
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime 
system, defining how data goes in and how it can be executed in the cluster. 
There are a few different interfaces available for implementing the actual 
source of the data and have it be discoverable in Flink.  
+
+For complex connectors, you may want to implement the [Source 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
 which gives you a lot of control. For simpler use cases, you can use the 
[SourceFunction 
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
 which is the base interface for all stream data sources in Flink. There are 
already a few different implementations of SourceFunction interfaces for common 
use cases such as the 
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
 class and the 
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
 class. You will use the latter.  
+
+`RichSourceFunction` is a base class for implementing a parallel data source 
that has access to context information and some lifecycle methods. There is a 
`run()` method inherited from the `SourceFunction` interface that you need to 
implement. It is invoked once and can be used to produce the data either once 
for a bounded result or within a loop for an unbounded stream.
+
+For example, to create a bounded data source, you could implement this method 
so that it reads all existing emails and then closes. To create an unbounded 
source, you could only look at new emails coming in while the source is active. 
You can also combine these behaviors and expose them through configuration 
options.
+
+When you first create the class and implement the interface, it should look 
something like this:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;

Review comment:
       mention that we use internal data structures because that is required by 
the table runtime




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to