Airblader commented on a change in pull request #460:
URL: https://github.com/apache/flink-web/pull/460#discussion_r690278068
##########
File path: _posts/2021-08-16-connector-table-sql-api-part1.md
##########
@@ -0,0 +1,234 @@
+---
+layout: post
+title: "Implementing a custom source connector for Table API and SQL - Part
One "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+ name: "Ingo Buerk"
+excerpt:
+---
+
+{% toc %}
+
+# Introduction
+
+Apache Flink is a data processing engine that keeps
[state](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/)
locally in order to do computations but does not store data. This means that
it does not include its own fault-tolerant storage component by default and
relies on external systems to ingest and persist data. Connecting to external
data input (**sources**) and external data storage (**sinks**) is achieved with
interfaces called **connectors**.
+
+Since connectors are such important components, Flink ships with [connectors
for some popular
systems](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
But sometimes you may need to read in an uncommon data format and what Flink
provides is not enough. This is why Flink also provides [APIs](#) for building
custom connectors if you want to connect to a system that is not supported by
an existing connector.
+
+Once you have a source and a sink defined for Flink, you can use its
declarative APIs (in the form of the [Table API and
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/))
to execute queries for data analysis without modification to the underlying
data.
+
+The **Table API** has the same operations as **SQL** but it extends and
improves SQL's functionality. It is named Table API because of its relational
functions on tables: how to obtain a table, how to output a table, and how to
perform query operation on the table.
+
+In this two-part tutorial, you will explore some of these APIs and concepts by
implementing your own custom source connector for reading in data from a
mailbox. You will use Flink to process an email inbox through the IMAP protocol
and sort them out by subject to a sink.
+
+Part one will focus on building a custom source connector and [part two](#)
will focus on integrating it.
+
+# Goals
+
+Part one of this tutorial will teach you how to build and run a custom source
connector to be used with Table API and SQL, two high-level abstractions in
Flink.
+
+You are encouraged to follow along with the code in this
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project
that also comes with a bundled
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have some familiarity with Java and
objected-oriented programming.
+
+It would also be useful to have
[docker-compose](https://docs.docker.com/compose/install/) installed on your
system in order to use the script included in the repository that builds and
runs the connector.
+
+
+# Understand the infrastructure required for a connector
+
+In order to create a connector which works with Flink, you need:
+
+1. A _factory class_ (a blueprint for creating other objects) that tells Flink
with which identifier (in this case, “imap”) our connector can be addressed,
which configuration options it exposes, and how the connector can be
instantiated. Since Flink uses the Java Service Provider Interface (SPI) to
discover factories located in different modules, you will also need to add some
configuration details.
+
+2. The _table source_ object as a specific instance of the connector during
the planning stage. It tells Flink some information about this instance and how
it can create the connector runtime implementation. There are also more
advanced features, such as
[abilities](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html),
that can be implemented to improve connector performance.
+
+3. A _runtime implementation_ from the connector obtained during the planning
stage. The runtime logic is implemented in Flink's core connector interfaces
and does the actual work of producing rows of dynamic table data. The runtime
instances are shipped to the Flink cluster.
+
+Let us look at this sequence (factory class → table source → runtime
implementation) in reverse order.
+
+# Establish the runtime implementation of the connector
+
+You first need to have a source connector which can be used in Flink's runtime
system, defining how data goes in and how it can be executed in the cluster.
There are a few different interfaces available for implementing the actual
source of the data and have it be discoverable in Flink.
+
+For complex connectors, you may want to implement the [Source
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Source.html)
which gives you a lot of control. For simpler use cases, you can use the
[SourceFunction
interface](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html),
which is the base interface for all stream data sources in Flink. There are
already a few different implementations of SourceFunction interfaces for common
use cases such as the
[FromElementsFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html)
class and the
[RichSourceFunction](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html)
class. You will use the latter.
+
+`RichSourceFunction` is a base class for implementing a parallel data source
that has access to context information and some lifecycle methods. There is a
`run()` method inherited from the `SourceFunction` interface that you need to
implement. It is invoked once and can be used to produce the data either once
for a bounded result or within a loop for an unbounded stream.
+
+For example, to create a bounded data source, you could implement this method
so that it reads all existing emails and then closes. To create an unbounded
source, you could only look at new emails coming in while the source is active.
You can also combine these behaviors and expose them through configuration
options.
+
+When you first create the class and implement the interface, it should look
something like this:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+ @Override
+ public void run(SourceContext<RowData> ctx) throws Exception {}
+
+ @Override
+ public void cancel() {}
+}
+```
+
+In the `run()` method, you get access to a
[context](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html)
object inherited from the SourceFunction interface, which is a bridge to Flink
and allows you to output data. Since the source does not produce any data yet,
the next step is to make it produce some static data in order to test that the
data flows correctly:
+
+```java
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData
+
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+ @Override
+ public void run(SourceContext<RowData> ctx) throws Exception {
+ ctx.collect(GenericRowData.of(
+ StringData.fromString("Subject 1"),
+ StringData.fromString("Hello, World!")
+ ));
+ }
+
+ @Override
+ public void cancel() {}
+}
+```
+
+// explain the collect function?
+
+You do not need to implement the `cancel()` method yet because the source
finishes instantly.
+
+
+# Create and configure a dynamic table source for the data stream
+
+[Dynamic
tables](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/)
are the core concept of Flink’s Table API and SQL support for streaming data
and, like its name suggests, change over time. You can imagine a data stream
being logically converted into a table that is constantly changing. For this
tutorial, the emails that will be read in will be interpreted as a (source)
table that is queryable. It can be viewed as a specific instance of a connector
class.
+
+You will now implement a
[DynamicTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html)
interface. There are two types of dynamic table sources:
[ScanTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/ScanTableSource.html)
and
[LookupTableSource](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/LookupTableSource.html).
Scan sources read the entire table on the external system while lookup sources
look for specific rows based on keys. The former will fit the use case of this
tutorial.
+
+This is what a scan table source implementation would look like:
+
+```java
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+
+public class ImapTableSource implements ScanTableSource {
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+ boolean bounded = true;
Review comment:
Hm, I guess you extracted this `true` to give it a name, but as a
developer, this local variable looks useless and a code review would probably
point that out. IDEs will typically name the argument for you anyway, i.e.
`of(…, true)` will be shown as `of(…, bounded: true)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]