This is an automated email from the ASF dual-hosted git repository. lresende pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-website.git
commit ef547fddc9b6977cb8ba59597f0ab94cf83ec004 Author: Luciano Resende <[email protected]> AuthorDate: Mon Dec 14 17:37:17 2020 -0800 Update documentation for Flink extensions --- site/docs/flink/current/flink-streaming-kudu.md | 315 ++++++++++++++++++----- site/docs/flink/current/flink-streaming-redis.md | 6 +- 2 files changed, 259 insertions(+), 62 deletions(-) diff --git a/site/docs/flink/current/flink-streaming-kudu.md b/site/docs/flink/current/flink-streaming-kudu.md index 2eef38c..2af5e9a 100644 --- a/site/docs/flink/current/flink-streaming-kudu.md +++ b/site/docs/flink/current/flink-streaming-kudu.md @@ -27,8 +27,12 @@ limitations under the License. # Flink Kudu Connector -This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the -following dependency to your project: +This connector provides a source (```KuduInputFormat```), a sink/output +(```KuduSink``` and ```KuduOutputFormat```, respectively), + as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`), + to allow reading and writing to [Kudu](https://kudu.apache.org/). + +To use this connector, add the following dependency to your project: <dependency> <groupId>org.apache.bahir</groupId> @@ -36,90 +40,283 @@ following dependency to your project: <version>1.1-SNAPSHOT</version> </dependency> -*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version). + *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+. Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. -See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html). +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html). ## Installing Kudu Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html). -Optionally, you can use the docker images provided in dockers folder. +Optionally, you can use the docker images provided in dockers folder. + +## SQL and Table API + +The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section) +we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API. + +For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/) -## KuduInputFormat +### Kudu Catalog +The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management. +By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only +allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as +in-memory catalog or Hive catalog. + +When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file: + +``` +catalogs: + - name: kudu + type: kudu + kudu.masters: <host>:7051 ``` -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -env.setParallelism(PARALLELISM); +Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;` -// create a table info object -KuduTableInfo tableInfo = KuduTableInfo.Builder - .create("books") - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) - .build(); - -// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips -env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo)) - .count(); - -env.execute(); +You can also create and use the KuduCatalog directly in the Table environment: + +```java +String KUDU_MASTERS="host1:port1,host2:port2" +KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS); +tableEnv.registerCatalog("kudu", catalog); +tableEnv.useCatalog("kudu"); ``` -## KuduOutputFormat +### DDL operations using SQL + +It is possible to manipulate Kudu tables using SQL DDL. + +When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause: +* `'connector.type'='kudu'` +* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters +* `'kudu.table'='...'`: The table's name within the Kudu database. + +If you have registered and are using the Kudu catalog, these properties are handled automatically. + +To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified +as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1). +Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use +`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu. + +The `NOT NULL` constraint can be added to any of the column definitions. +By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint. +Hash columns must be a subset of primary key columns. + +Kudu Catalog ``` -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +CREATE TABLE TestTable ( + first STRING, + second STRING, + third INT NOT NULL +) WITH ( + 'kudu.hash-columns' = 'first', + 'kudu.primary-key-columns' = 'first,second' +) +``` -env.setParallelism(PARALLELISM); +Other catalogs +``` +CREATE TABLE TestTable ( + first STRING, + second STRING, + third INT NOT NULL +) WITH ( + 'connector.type' = 'kudu', + 'kudu.masters' = '...', + 'kudu.table' = 'TestTable', + 'kudu.hash-columns' = 'first', + 'kudu.primary-key-columns' = 'first,second' +) +``` -// create a table info object -KuduTableInfo tableInfo = KuduTableInfo.Builder - .create("books") - .createIfNotExist(true) - .replicas(1) - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) - .build(); +Renaming a table: +``` +ALTER TABLE TestTable RENAME TO TestTableRen +``` + +Dropping a table: +```sql +DROP TABLE TestTableRen +``` -... +#### Creating a KuduTable directly with KuduCatalog -env.fromCollection(books) - .output(new KuduOutputFormat<>("172.25.0.6", tableInfo)); +The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration, +including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object. -env.execute(); +Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and +a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()` +returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects; + and `getCreateTableOptions()` returning a +[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object. + +This example shows the creation of a table called `ExampleTable` with two columns, +`first` being a primary key; and configuration of replicas and hash partitioning. + +```java +KuduTableInfo tableInfo = KuduTableInfo + .forTable("ExampleTable") + .createTableIfNotExists( + () -> + Lists.newArrayList( + new ColumnSchema + .ColumnSchemaBuilder("first", Type.INT32) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("second", Type.STRING) + .build() + ), + () -> new CreateTableOptions() + .setNumReplicas(1) + .addHashPartitions(Lists.newArrayList("first"), 2)); + +catalog.createTable(tableInfo, false); ``` +The example uses lambda expressions to implement the functional interfaces. + +Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html). + +### Supported data types +| Flink/SQL | Kudu | +| ------------- |:-------------:| +| STRING | STRING | +| BOOLEAN | BOOL | +| TINYINT | INT8 | +| SMALLINT | INT16 | +| INT | INT32 | +| BIGINT | INT64 | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BYTES | BINARY | +| TIMESTAMP(3) | UNIXTIME_MICROS | + +Note: +* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp` +* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)` +* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)` +* `DECIMAL` types are not yet supported -## KuduSink +### Known limitations +* Data type limitations (see above). +* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the +`PRIMARY KEY` constraint is not yet possible. +* SQL Create table: range partitioning is not supported. +* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns +are described as being nullable, and not being primary keys. +* Kudu tables cannot be altered through the catalog other than simple renaming +## DataStream API + +It is also possible to use the Kudu connector directly from the DataStream API however we +encourage all users to explore the Table API as it provides a lot of useful tooling when working +with Kudu data. + +### Reading tables into a DataStreams + +There are 2 main ways of reading a Kudu Table into a DataStream + 1. Using the `KuduCatalog` and the Table API + 2. Using the `KuduRowInputFormat` directly + +Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically +guarantees type safety and takes care of configuration of our readers. + +This is how it works in practice: +```java +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings); + +tableEnv.registerCatalog("kudu", new KuduCatalog("master:port")); +tableEnv.useCatalog("kudu"); + +Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable"); +DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class); +``` + +The second way of achieving the same thing is by using the `KuduRowInputFormat` directly. +In this case we have to manually provide all information about our table: + +```java +KuduTableInfo tableInfo = ... +KuduReaderConfig readerConfig = ... +KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo); + +DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo); +``` + +At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`. + +### Kudu Sink +The connector provides a `KuduSink` class that can be used to consume DataStreams +and write the results into a Kudu table. + +The constructor takes 3 or 4 arguments. + * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode. + * `KuduTableInfo` identifies the table to be written + * `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations. + * `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures. + +The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record. +It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case, +we could pass a `KuduTableInfo` as described in the [Catalog - Creating a table](#creating-a-table) section, +and the sink would create the table with the provided configuration. + +```java +KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build(); + +KuduSink<Row> sink = new KuduSink<>( + writerConfig, + KuduTableInfo.forTable("AlreadyExistingTable"), + new RowOperationMapper<>( + new String[]{"col1", "col2", "col3"}, + AbstractSingleOperationMapper.KuduOperation.UPSERT) +) ``` -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setParallelism(PARALLELISM); +#### KuduOperationMapper + +This section describes the Operation mapping logic in more detail. + +The connector supports insert, upsert, update, and delete operations. +The operation to be performed can vary dynamically based on the record. +To allow for more flexibility, it is also possible for one record to trigger +0, 1, or more operations. +For the highest level of control, implement the `KuduOperationMapper` interface. -// create a table info object -KuduTableInfo tableInfo = KuduTableInfo.Builder - .create("books") - .createIfNotExist(true) - .replicas(1) - .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) - .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) - .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) - .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) - .build(); +If one record from the DataStream corresponds to one table operation, +extend the `AbstractSingleOperationMapper` class. An array of column +names must be provided. This must match the Kudu table's schema. -... +The `getField` method must be overridden, which extracts the value for the table column whose name is +at the `i`th place in the `columnNames` array. +If the operation is one of (`CREATE, UPSERT, UPDATE, DELETE`) +and doesn't depend on the input record (constant during the life of the sink), it can be set in the constructor +of `AbstractSingleOperationMapper`. +It is also possible to implement your own logic by overriding the +`createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html). -env.fromCollection(books) - .addSink(new KuduSink<>("172.25.0.6", tableInfo)); +There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks. +* `PojoOperationMapper`: Each table column must correspond to a POJO field +with the same name. The `columnNames` array should contain those fields of the POJO that +are present as table columns (the POJO fields can be a superset of table columns). +* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The +`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th +position in the `columnNames` array. -env.execute(); +## Building the connector + +The connector can be easily built by using maven: + +``` +cd bahir-flink +mvn clean install ``` + +### Running the tests + +The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost. + +This might not work out of the box on some operating systems (such as Mac OS X). +To solve this problem go to *System Preferences/Sharing* and enable Remote login for your user. diff --git a/site/docs/flink/current/flink-streaming-redis.md b/site/docs/flink/current/flink-streaming-redis.md index 1a2f8c2..0c646fb 100644 --- a/site/docs/flink/current/flink-streaming-redis.md +++ b/site/docs/flink/current/flink-streaming-redis.md @@ -105,7 +105,7 @@ This example code does the same, but for Redis Cluster: **Java:** - FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() + FlinkJedisPoolConfig conf = new FlinkJedisClusterConfig.Builder() .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; @@ -114,7 +114,7 @@ This example code does the same, but for Redis Cluster: **Scala:** - val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() + val conf = new FlinkJedisClusterConfig.Builder().setNodes(...).build() stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) @@ -155,7 +155,7 @@ This section gives a description of all the available data types and what Redis </td> </tr> <tr> - <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td> + <td>SET</td><td><a href="http://redis.io/commands/sadd">SADD</a></td> </tr> <tr> <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td>
