This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
The following commit(s) were added to refs/heads/main by this push: new 6f5ef7c [FLINK-37527] Add `KuduSource` documentation 6f5ef7c is described below commit 6f5ef7c7b559ab9d5aa612974283b7698fbc868c Author: Ferenc Csaky <fcs...@apache.org> AuthorDate: Fri Mar 21 10:42:38 2025 +0100 [FLINK-37527] Add `KuduSource` documentation --- docs/content/docs/connectors/datastream/kudu.md | 105 ++++++++++++++++++------ 1 file changed, 80 insertions(+), 25 deletions(-) diff --git a/docs/content/docs/connectors/datastream/kudu.md b/docs/content/docs/connectors/datastream/kudu.md index 556822c..e479340 100644 --- a/docs/content/docs/connectors/datastream/kudu.md +++ b/docs/content/docs/connectors/datastream/kudu.md @@ -44,10 +44,11 @@ The current version of the connector is built with Kudu client version **1.17.1* ## Reading from Kudu -There are two specific ways of reading a Kudu table into a `DataStream`: +The connector provides the following ways of reading a Kudu table into a `DataStream`: 1. Using the `KuduCatalog` and Table API programmatically. -2. Using the `KuduRowInputFormat` directly. +2. Using the `KuduSource` class. +3. Using the `KuduRowInputFormat` directly. ### Kudu Catalog @@ -70,6 +71,56 @@ Table table = tEnv.sqlQuery("SELECT * FROM MyKuduTable"); DataStream<Row> ds = tEnv.toDataStream(table); ``` +### Kudu Source + +{{< hint info >}} +This part describes the Kudu Source usage, which is based on the new [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API. +{{< /hint >}} + +The Kudu Source provides a builder class that helps in the construction of the object. +The below code snippet shows how to build a Kudu Source to read data from an existing Kudu table. +The `KuduReaderConfig` class provides a way to configure Kudu-specific options that controls the read behavior. + +```java +KuduTableInfo tableInfo = KuduTableInfo.forTable("my_kudu_table"); +KuduReaderConfig readerConfig = KuduReaderConfig.Builder + .setMasters("localhost:7051") + .build(); + +KuduSource<Row> source = + KuduSource.<Row>builder() + .setTableInfo(tableInfo) + .setReaderConfig(readerConfig) + .setRowResultConverter(new RowResultRowConverter()) + .build(); + +env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kudu Source"); +``` + +{{< hint info >}} +It is also possible to create a non-existing Kudu table. To learn more about that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu" >}}#create-kudu-table) section. +{{< /hint >}} + +#### Boundedness + +Although Kudu is a bounded source, it can still be useful to run in a streaming manner, when the job does not stop until a failure or if it is stopped/cancelled. +By default `KuduSource` is running in bounded mode, but setting `.setBoundedness(Boundedness)` to `CONTINUOUS_UNBOUNDED` will trigger streaming mode. + +In `CONTINUOUS_UNBOUNDED` mode, the source follows a CDC-like behavior. This means that at job start it will perform a snapshot of the source table and mark that snapshot time. +From that point onward the source will perform differential scans periodically, so it will only process the changes made in that specific period. +The duration of this period is controlled by the `.setDiscoveryPeriod(Duration)` property. The following example shows how to read a Kudu table in a streaming fashion and read updates in 1 minute periods. + +```java +KuduSource<Row> source = + KuduSource.<Row>builder() + .setTableInfo(...) + .setReaderConfig(...) + .setRowResultConverter(new RowResultRowConverter()) + .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED) + .setDiscoveryPeriod(Duration.ofMinutes(1)) + .build(); +``` + ### Kudu Row Input Format We can also create a `DataStream` by using the `KuduRowInputFormat` directly. In this case we have to manually provide all information about our table: @@ -79,7 +130,7 @@ KuduTableInfo tableInfo = ... KuduReaderConfig readerConfig = ... KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo); -DataStream<Row> ds = env.createInput(inputFormat); +env.createInput(inputFormat); ``` ## Writing to Kudu @@ -114,28 +165,9 @@ KuduSink<Row> sink = KuduSink.<Row>builder() ds.sinkTo(sink); ``` -#### Create Kudu table - -If the Kudu table does not exist, we can pass the schema and configuration to `KuduTableInfo`, and the sink will try to create the table based on that. - -```java -KuduTableInfo tableInfo = KuduTableInfo - .forTable("new-table") - .createTableIfNotExists( - () -> - Arrays.asList( - new ColumnSchema - .ColumnSchemaBuilder("first", Type.INT32) - .key(true) - .build(), - new ColumnSchema - .ColumnSchemaBuilder("second", Type.STRING) - .build() - ), - () -> new CreateTableOptions() - .setNumReplicas(3) - .addHashPartitions(Lists.newArrayList("first"), 2)); -``` +{{< hint info >}} +It is also possible to create a non-existing Kudu table. To learn more about that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu" >}}#create-kudu-table) section. +{{< /hint >}} ### Kudu Operation Mapping @@ -160,3 +192,26 @@ There are pre-defined operation mappers for POJO, Flink `Row`, and Flink `Tuple` * `RowOperationMapper`/`TupleOperationMapper`: The mapping is based on position. The `i`th field of the `Row`/`Tuple` corresponds to the column of the table at the `i`th position in the `columnNames` array. +## Create Kudu table + +If a table does not exist on the Kudu side, you can pass the desired schema and configuration to `KuduTableInfo` via `createTableIfNotExists(...)`. +This way either the source or the sink will try to create the table. + +```java +KuduTableInfo tableInfo = KuduTableInfo + .forTable("new-table") + .createTableIfNotExists( + () -> + Arrays.asList( + new ColumnSchema + .ColumnSchemaBuilder("first", Type.INT32) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("second", Type.STRING) + .build() + ), + () -> new CreateTableOptions() + .setNumReplicas(3) + .addHashPartitions(Lists.newArrayList("first"), 2)); +```