This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f5ef7c  [FLINK-37527] Add `KuduSource` documentation
6f5ef7c is described below

commit 6f5ef7c7b559ab9d5aa612974283b7698fbc868c
Author: Ferenc Csaky <fcs...@apache.org>
AuthorDate: Fri Mar 21 10:42:38 2025 +0100

    [FLINK-37527] Add `KuduSource` documentation
---
 docs/content/docs/connectors/datastream/kudu.md | 105 ++++++++++++++++++------
 1 file changed, 80 insertions(+), 25 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/kudu.md 
b/docs/content/docs/connectors/datastream/kudu.md
index 556822c..e479340 100644
--- a/docs/content/docs/connectors/datastream/kudu.md
+++ b/docs/content/docs/connectors/datastream/kudu.md
@@ -44,10 +44,11 @@ The current version of the connector is built with Kudu 
client version **1.17.1*
 
 ## Reading from Kudu
 
-There are two specific ways of reading a Kudu table into a `DataStream`:
+The connector provides the following ways of reading a Kudu table into a 
`DataStream`:
 
 1. Using the `KuduCatalog` and Table API programmatically.
-2. Using the `KuduRowInputFormat` directly.
+2. Using the `KuduSource` class.
+3. Using the `KuduRowInputFormat` directly.
 
 ### Kudu Catalog
 
@@ -70,6 +71,56 @@ Table table = tEnv.sqlQuery("SELECT * FROM MyKuduTable");
 DataStream<Row> ds = tEnv.toDataStream(table);
 ```
 
+### Kudu Source
+
+{{< hint info >}}
+This part describes the Kudu Source usage, which is based on the new [data 
source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+The Kudu Source provides a builder class that helps in the construction of the 
object.
+The below code snippet shows how to build a Kudu Source to read data from an 
existing Kudu table.
+The `KuduReaderConfig` class provides a way to configure Kudu-specific options 
that controls the read behavior. 
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo.forTable("my_kudu_table");
+KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+        .setMasters("localhost:7051")
+        .build();
+
+KuduSource<Row> source =
+        KuduSource.<Row>builder()
+                .setTableInfo(tableInfo)
+                .setReaderConfig(readerConfig)
+                .setRowResultConverter(new RowResultRowConverter())
+                .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kudu Source");
+```
+
+{{< hint info >}}
+It is also possible to create a non-existing Kudu table. To learn more about 
that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu" 
>}}#create-kudu-table) section.
+{{< /hint >}}
+
+#### Boundedness
+
+Although Kudu is a bounded source, it can still be useful to run in a 
streaming manner, when the job does not stop until a failure or if it is 
stopped/cancelled.
+By default `KuduSource` is running in bounded mode, but setting 
`.setBoundedness(Boundedness)` to `CONTINUOUS_UNBOUNDED` will trigger streaming 
mode.
+
+In `CONTINUOUS_UNBOUNDED` mode, the source follows a CDC-like behavior. This 
means that at job start it will perform a snapshot of the source table and mark 
that snapshot time.
+From that point onward the source will perform differential scans 
periodically, so it will only process the changes made in that specific period.
+The duration of this period is controlled by the 
`.setDiscoveryPeriod(Duration)` property. The following example shows how to 
read a Kudu table in a streaming fashion and read updates in 1 minute periods. 
+
+```java
+KuduSource<Row> source =
+        KuduSource.<Row>builder()
+                .setTableInfo(...)
+                .setReaderConfig(...)
+                .setRowResultConverter(new RowResultRowConverter())
+                .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
+                .setDiscoveryPeriod(Duration.ofMinutes(1))
+                .build();
+```
+
 ### Kudu Row Input Format
 
 We can also create a `DataStream` by using the `KuduRowInputFormat` directly. 
In this case we have to manually provide all information about our table:
@@ -79,7 +130,7 @@ KuduTableInfo tableInfo = ...
 KuduReaderConfig readerConfig = ...
 KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, 
tableInfo);
 
-DataStream<Row> ds = env.createInput(inputFormat);
+env.createInput(inputFormat);
 ```
 
 ## Writing to Kudu
@@ -114,28 +165,9 @@ KuduSink<Row> sink = KuduSink.<Row>builder()
 ds.sinkTo(sink);
 ```
 
-#### Create Kudu table
-
-If the Kudu table does not exist, we can pass the schema and configuration to 
`KuduTableInfo`, and the sink will try to create the table based on that.
-
-```java
-KuduTableInfo tableInfo = KuduTableInfo
-    .forTable("new-table")
-    .createTableIfNotExists(
-        () ->
-            Arrays.asList(
-                new ColumnSchema
-                    .ColumnSchemaBuilder("first", Type.INT32)
-                    .key(true)
-                    .build(),
-                new ColumnSchema
-                    .ColumnSchemaBuilder("second", Type.STRING)
-                    .build()
-            ),
-        () -> new CreateTableOptions()
-            .setNumReplicas(3)
-            .addHashPartitions(Lists.newArrayList("first"), 2));
-```
+{{< hint info >}}
+It is also possible to create a non-existing Kudu table. To learn more about 
that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu" 
>}}#create-kudu-table) section.
+{{< /hint >}}
 
 ### Kudu Operation Mapping
 
@@ -160,3 +192,26 @@ There are pre-defined operation mappers for POJO, Flink 
`Row`, and Flink `Tuple`
 * `RowOperationMapper`/`TupleOperationMapper`: The mapping is based on 
position. The `i`th field of the `Row`/`Tuple`
   corresponds to the column of the table at the `i`th position in the 
`columnNames` array.
 
+## Create Kudu table
+
+If a table does not exist on the Kudu side, you can pass the desired schema 
and configuration to `KuduTableInfo` via `createTableIfNotExists(...)`.
+This way either the source or the sink will try to create the table.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo
+    .forTable("new-table")
+    .createTableIfNotExists(
+        () ->
+            Arrays.asList(
+                new ColumnSchema
+                    .ColumnSchemaBuilder("first", Type.INT32)
+                    .key(true)
+                    .build(),
+                new ColumnSchema
+                    .ColumnSchemaBuilder("second", Type.STRING)
+                    .build()
+            ),
+        () -> new CreateTableOptions()
+            .setNumReplicas(3)
+            .addHashPartitions(Lists.newArrayList("first"), 2));
+```

Reply via email to