Airblader commented on a change in pull request #460:
URL: https://github.com/apache/flink-web/pull/460#discussion_r691166021
##########
File path: _posts/2021-08-16-connector-table-sql-api-part2.md
##########
@@ -0,0 +1,434 @@
+---
+layout: post
+title: "Implementing a custom source connector for Table API and SQL - Part
Two "
+date: 2021-08-18T00:00:00.000Z
+authors:
+- Ingo Buerk:
+ name: "Ingo Buerk"
+excerpt:
+---
+
+{% toc %}
+
+# Introduction
+
+In [part one](#) of this tutorial, you learned how to build a custom source
connector for Flink. In part two, you will learn how to integrate the connector
with a test email inbox through the IMAP protocol, filter out emails, and
execute [Flink SQL on the Ververica
Platform](https://www.ververica.com/apache-flink-sql-on-ververica-platform).
+
+# Goals
+
+Part two of the tutorial will teach you how to:
+
+- integrate a source connector which connects to a mailbox using the IMAP
protocol
+- use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java library
that can send and receive email via the IMAP protocol
+- write [Flink
SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/)
and execute the queries in the Ververica Platform
+
+You are encouraged to follow along with the code in this
[repository](github.com/Airblader/blog-imap). It provides a boilerplate project
that also comes with a bundled
[docker-compose](https://docs.docker.com/compose/) setup that lets you easily
run the connector. You can then try it out with Flink’s SQL client.
+
+
+# Prerequisites
+
+This tutorial assumes that you have:
+
+- followed the steps outlined in [part one](#) of this tutorial
+- some familiarity with Java and objected-oriented programming
+
+
+# Understand how to fetch emails via the IMAP protocol
+
+Now that you have a working source connector that can run on Flink, it is time
to connect to an email server via IMAP (an Internet protocol that allows email
clients to retrieve messages from a mail server) so that Flink can process
emails instead of test static data.
+
+You will use [Jakarta Mail](https://eclipse-ee4j.github.io/mail/), a Java
library that can be used to send and receive email via IMAP. For simplicity,
authentication will use a plain username and password.
+
+This tutorial will focus more on how to implement a connector for Flink. If
you want to learn more about the details of how IMAP or Jakarta Mail work, you
are encouraged to explore a more extensive implementation at this
[repository](github.com/Airblader/flink-connector-email).
+
+In order to fetch emails, you will need to connect to the email server,
register a listener for new emails and collect them whenever they arrive, and
enter a loop to keep the connector running.
+
+
+# Add configuration options - server information and credentials
+
+In order to connect to your IMAP server, you will need at least the following:
+
+- hostname (of the mail server)
+- port number
+- username
+- password
+
+You will start by creating a class to encapsulate the configuration options.
You will make use of [Lombok](https://projectlombok.org/setup/maven) to help
with some boilerplate code. By adding the `@Data` and `@Builder` annotations,
Lombok will generate these for all the fields of the immutable class.
+
+```java
+@Data
+@Builder
+public class ImapSourceOptions implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String host;
+ private final Integer port;
+ private final String user;
+ private final String password;
+}
+```
+
+Now you can add an instance of this class to the `ImapSourceFunction` and
`ImapTableSource` classes so it can be used there. Take note of the column
names with which the table has been created. This will help later.
+
+// QUESTION: what would the column names be here??
+
+```java
+public class ImapSourceFunction extends RichSourceFunction<RowData> {
+ private final ImapSourceOptions options;
+ private final List<String> columnNames;
+
+ public ImapSourceFunction(
+ ImapSourceOptions options,
+ List<String> columnNames
+ ) {
+ this.options = options;
+ this.columnNames = columnNames.stream()
+ .map(String::toUpperCase)
+ .collect(Collectors.toList());
+ }
+
+ // ...
+}
+```
+
+```java
+public class ImapTableSource implements ScanTableSource {
+
+ private final ImapSourceOptions options;
+ private final List<String> columnNames;
+
+ public ImapTableSource(
+ ImapSourceOptions options,
+ List<String> columnNames
+ ) {
+ this.options = options;
+ this.columnNames = columnNames;
+ }
+
+ // …
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
+ final ImapSourceFunction sourceFunction = new
ImapSourceFunction(options, columnNames);
+ return SourceFunctionProvider.of(sourceFunction, true);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new ImapTableSource(options, columnNames);
+ }
+
+ // …
+}
+```
+
+Finally, in the `ImapTableSourceFactory` class, you need to create a
`ConfigOption<Type>Name` for the hostname, port number, username, and password.
Then you need to report them to Flink. Since all of the current options are
mandatory, you can add them to the `requiredOptions()` method in order to do
this.
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+ public static final ConfigOption<String> HOST =
ConfigOptions.key("host").stringType().noDefaultValue();
+ public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port").intType().noDefaultValue();
+ public static final ConfigOption<String> USER =
ConfigOptions.key("user").stringType().noDefaultValue();
+ public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue();
+
+ // …
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOST);
+ options.add(PORT);
+ options.add(USER);
+ options.add(PASSWORD);
+ return options;
+ }
+
+ // …
+}
+```
+
+Now take a look at the `createDynamicTableSource()` function in the
`ImapTableSouceFactory` class. Recall that previously (in part one) you had
created a small helper utility
[TableFactoryHelper](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html),
that Flink offers which ensures that required options are set and that no
unknown options are provided. You can now use it to automatically make sure
that the required options of hostname, port number, username, and password are
all provided when creating a table using this connector. The helper function
will throw an error message if one required option is missing. You can also use
it to access the provided options (`getOptions()`), convert them into an
instance of the `ImapTableSource` class created earlier, and provide the
instance to the table source:
+
+// why would you want to do the latter??
+
+```java
+public class ImapTableSourceFactory implements DynamicTableSourceFactory {
+
+ // ...
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context ctx) {
+ final FactoryUtil.TableFactoryHelper factoryHelper =
FactoryUtil.createTableFactoryHelper(this, ctx);
+ factoryHelper.validate();
+
+ final ImapSourceOptions options = ImapSourceOptions.builder()
+ .host(factoryHelper.getOptions().get(HOST))
+ .port(factoryHelper.getOptions().get(PORT))
+ .user(factoryHelper.getOptions().get(USER))
+ .password(factoryHelper.getOptions().get(PASSWORD))
+ .build();
+ final List<String> columnNames =
ctx.getCatalogTable().getResolvedSchema().getColumnNames();
Review comment:
I discussed this offline with @twalthr. In the interest of finding a
balance between "correctness" and brevity, let's change this to this:
```
final List<String> columnNames =
ctx.getCatalogTable().getResolvedSchema().getColumns().stream()
.filter(Column::isPhysical)
.map(Column::getName)
.collect(Collectors.toList());
```
Additionally we should add an explanation that ideally we'd be making use of
connector metadata instead (with a link), but that we'll keep it simple here
instead and maybe refer again to github.com/TNG/flink-connector-email which
does implement this using metadata now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]