This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f300991 [FLINK-32417] Add DynamicKafkaSource documentation for setter 
methods, metrics, and config options. This closes #80
4f300991 is described below

commit 4f30099135fe68e412b4fc18d34c085d654c471e
Author: Mason Chen <mas.c...@berkeley.edu>
AuthorDate: Thu Jan 25 13:56:04 2024 -0800

    [FLINK-32417] Add DynamicKafkaSource documentation for setter methods, 
metrics, and config options. This closes #80
---
 .../docs/connectors/datastream/dynamic-kafka.md    | 248 +++++++++++++++++++++
 .../docs/connectors/table/dynamic-kafka.md         | 141 ------------
 .../docs/connectors/datastream/dynamic-kafka.md    | 137 ++++++++++--
 .../dynamic/source/DynamicKafkaSourceOptions.java  |   4 +-
 4 files changed, 372 insertions(+), 158 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
new file mode 100644
index 00000000..e46adca3
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -0,0 +1,248 @@
+---
+title: Dynamic Kafka
+weight: 3
+type: docs
+aliases:
+- /dev/connectors/dynamic-kafka.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Dynamic Kafka Source _`Experimental`_
+
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for 
reading data from Kafka topics from one or more Kafka clusters.
+The Dynamic Kafka connector discovers the clusters and topics using a Kafka 
metadata service and can achieve reading in a dynamic fashion, facilitating 
changes in
+topics and/or clusters, without requiring a job restart. This is especially 
useful when you need to read a new Kafka cluster/topic and/or stop reading
+an existing Kafka cluster/topic (cluster migration/failover/other 
infrastructure changes) and when you need direct integration with Hybrid 
Source. The solution
+makes these operations automated so that they are transparent to Kafka 
consumers.
+
+## Dependency
+
+For details on Kafka compatibility, please refer to the official [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+
+{{< connector_artifact flink-connector-kafka 3.1.0 >}}
+
+Flink's streaming connectors are not part of the binary distribution.
+See how to link with them for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+## Dynamic Kafka Source
+{{< hint info >}}
+This part describes the Dynamic Kafka Source based on the new
+[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+### Usage
+
+Dynamic Kafka Source provides a builder class to initialize the 
DynamicKafkaSource. The code snippet
+below shows how to build a DynamicKafkaSource to consume messages from the 
earliest offset of the
+stream "input-stream" and deserialize only the value of the
+ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the 
cluster(s) and topic(s)
+corresponding to "input-stream".
+
+{{< tabs "DynamicKafkaSource" >}}
+{{< tab "Java" >}}
+```java
+
+DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
+        .setKafkaMetadataService(new MyKafkaMetadataService())
+        .setStreamIds(Collections.singleton("input-stream"))
+        .setStartingOffsets(OffsetsInitializer.earliest())
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+        .setProperties(properties)
+        .build();
+
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic 
Kafka Source");
+```
+{{< /tab >}}
+{{< /tabs >}}
+The following properties are **required** for building a DynamicKafkaSource:
+
+The Kafka metadata service, configured by 
setKafkaMetadataService(KafkaMetadataService)
+The stream ids to subscribe, see the following Kafka stream subscription 
section for more details.
+Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< 
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+
+### Kafka Stream Subscription
+The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
+* A set of Kafka stream ids. For example:
+  {{< tabs "DynamicKafkaSource#setStreamIds" >}}
+  {{< tab "Java" >}}
+  ```java
+  DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+* A regex pattern that subscribes to all Kafka stream ids that match the 
provided regex. For example:
+  {{< tabs "DynamicKafkaSource#setStreamPattern" >}}
+  {{< tab "Java" >}}
+  ```java
+  DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
+### Kafka Metadata Service
+
+An interface is provided to resolve the logical Kafka stream(s) into the 
corresponding physical
+topic(s) and cluster(s). Typically, these implementations are based on 
services that align well
+with internal Kafka infrastructure--if that is not available, an in-memory 
implementation
+would also work. An example of in-memory implementation can be found in our 
tests.
+
+This source achieves its dynamic characteristic by periodically polling this 
Kafka metadata service
+for any changes to the Kafka stream(s) and reconciling the reader tasks to 
subscribe to the new
+Kafka metadata returned by the service. For example, in the case of a Kafka 
migration, the source would
+swap from one cluster to the new cluster when the service makes that change in 
the Kafka stream metadata.
+
+### Additional Properties
+There are configuration options in DynamicKafkaSourceOptions that can be 
configured in the properties through the builder:
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>The interval in milliseconds for the source to discover the changes 
in stream metadata. A non-positive value disables the stream metadata 
discovery.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">1</td>
+      <td>Integer</td>
+      <td>The number of consecutive failures before letting the exception from 
Kafka metadata service discovery trigger jobmanager failure and global 
failover. The default is one to at least catch startup failures.</td>
+    </tr>
+    </tbody>
+</table>
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref 
"docs/connectors/datastream/kafka" >}}#additional-properties) for
+a list of applicable properties.
+
+### Metrics
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 18%">Metrics</th>
+      <th class="text-left" style="width: 18%">User Variables</th>
+      <th class="text-left" style="width: 39%">Description</th>
+      <th class="text-left" style="width: 10%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <th rowspan="8">Operator</th>
+        <td>currentEmitEventTimeLag</td>
+        <td>n/a</td>
+        <td>The time span from the record event timestamp to the time the 
record is emitted by the source connector¹: <code>currentEmitEventTimeLag = 
EmitTime - EventTime.</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>watermarkLag</td>
+        <td>n/a</td>
+        <td>The time span that the watermark lags behind the wall clock time: 
<code>watermarkLag = CurrentTime - Watermark</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>sourceIdleTime</td>
+        <td>n/a</td>
+        <td>The time span that the source has not processed any record: 
<code>sourceIdleTime = CurrentTime - LastRecordProcessTime</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>pendingRecords</td>
+        <td>n/a</td>
+        <td>The number of records that have not been fetched by the source. 
e.g. the available records after the consumer offset in a Kafka partition.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>kafkaClustersCount</td>
+      <td>n/a</td>
+      <td>The total number of Kafka clusters read by this reader.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+In addition to this list, see the [regular Kafka connector]({{< ref 
"docs/connectors/datastream/kafka" >}}#monitoring) for
+the KafkaSourceReader metrics that are also reported.
+
+### Additional Details
+
+For additional details on deserialization, event time and watermarks, 
idleness, consumer offset
+committing, security, and more, you can refer to the [Kafka Source 
documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-source). 
This is possible because the
+Dynamic Kafka Source leverages components of the Kafka Source, and the 
implementation will be
+discussed in the next section.
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how Kafka source works under the design of new data 
source API, you may
+want to read this part as a reference. For details about the new data source 
API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) 
and
+<a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";>FLIP-27</a>
+provide more descriptive discussions.
+{{< /hint >}}
+
+
+Under the abstraction of the new data source API, Dynamic Kafka Source 
consists of the following components:
+#### Source Split
+A source split in Dynamic Kafka Source represents a partition of a Kafka 
topic, with cluster information. It
+consists of:
+* A Kafka cluster id that can be resolved by the Kafka metadata service.
+* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
+
+You can check the class `DynamicKafkaSourceSplit` for more details.
+
+#### Split Enumerator
+
+This enumerator is responsible for discovering and assigning splits from one 
or more clusters. At startup, the
+enumerator will discover metadata belonging to the Kafka stream ids. Using the 
metadata, it can
+initialize KafkaSourceEnumerators to handle the functions of assigning splits 
to the readers. In addition,
+source events will be sent to the source reader to reconcile the metadata. 
This enumerator has the ability to poll the
+KafkaMetadataService, periodically for stream discovery. In addition, 
restarting enumerators when metadata changes involve
+clearing outdated metrics since clusters may be removed and so should their 
metrics.
+
+#### Source Reader
+
+This reader is responsible for reading from one or more clusters and using the 
KafkaSourceReader to fetch
+records from topics and clusters based on the metadata. When new metadata is 
discovered by the enumerator,
+the reader will reconcile metadata changes to possibly restart the 
KafkaSourceReader to read from the new
+set of topics and clusters.
+
+#### Kafka Metadata Service
+
+This interface represents the source of truth for the current metadata for the 
configured Kafka stream ids.
+Metadata that is removed in between polls is considered non-active (e.g. 
removing a cluster from the
+return value, means that a cluster is non-active and should not be read from). 
The cluster metadata
+contains an immutable Kafka cluster id, the set of topics, and properties 
needed to connect to the
+Kafka cluster.
+
+#### FLIP 246
+
+To understand more behind the scenes, please read 
[FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
+for more details and discussion.
diff --git a/docs/content.zh/docs/connectors/table/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
deleted file mode 100644
index f00a2f22..00000000
--- a/docs/content.zh/docs/connectors/table/dynamic-kafka.md
+++ /dev/null
@@ -1,141 +0,0 @@
----
-title: Kafka
-weight: 3
-type: docs
-aliases:
-  - /zh/dev/connectors/dynamic-kafka.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Dynamic Kafka Source _`Experimental`_
-
-Flink provides an [Apache Kafka](https://kafka.apache.org) connector for 
reading data from and 
-writing data to Kafka topics from one or more Kafka clusters. This connector 
achieves this in a dynamic 
-fashion, without requiring a job restart, using a Kafka metadata service to 
facilitate changes in 
-topics and/or clusters. This is especially useful in transparent Kafka cluster 
addition/removal without 
-Flink job restart, transparent Kafka topic addition/removal without Flink job 
restart, and direct integration
-with Hybrid Source.
-
-## Dependency
-
-For details on Kafka compatibility, please refer to the official [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
-
-{{< connector_artifact flink-connector-kafka 3.1.0 >}}
-
-Flink's streaming connectors are not part of the binary distribution.
-See how to link with them for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
-
-## Dynamic Kafka Source
-{{< hint info >}}
-This part describes the Dynamic Kafka Source based on the new
-[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
-{{< /hint >}}
-
-## Usage
-
-Dynamic Kafka Source provides a builder class to initialize the 
DynamicKafkaSource. The code snippet 
-below shows how to build a DynamicKafkaSource to consume messages from the 
earliest offset of the 
-stream "input-stream" and deserialize only the value of the 
-ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the 
cluster(s) and topic(s)
-corresponding to "input-stream".
-
-{{< tabs "KafkaSource" >}}
-{{< tab "Java" >}}
-```java
-
-DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
-    .setKafkaMetadataService(new MyKafkaMetadataService())
-    .setStreamIds(Collections.singleton("input-stream"))
-    
.setStartingOffsets(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
-    .setDeserializer(new SimpleStringSchema())
-    .setProperties(properties)
-    .build();
-
-env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-### Kafka Metadata Service
-
-An interface is provided to resolve the logical Kafka stream(s) into the 
corresponding physical 
-topic(s) and cluster(s). Typically, these implementations are based on 
services that align well
-with internal Kafka infrastructure--if that is not available, an in-memory 
implementation 
-would also work. An example of in-memory implementation can be found in our 
tests.
-
-This source achieves its dynamic characteristic by periodically polling this 
Kafka metadata service
-for any changes to the Kafka stream(s) and reconciling the reader tasks to 
subscribe to the new 
-Kafka metadata returned by the service. For example, in the case of a Kafka 
migration, the source would 
-swap from one cluster to the new cluster when the service makes that change in 
the Kafka stream metadata.
-
-### Additional Details
-
-For additional details on deserialization, event time and watermarks, 
idleness, consumer offset 
-committing, security, and more, you can refer to the Kafka Source 
documentation. This is possible because the 
-Dynamic Kafka Source leverages components of the Kafka Source, and the 
implementation will be 
-discussed in the next section.
-
-### Behind the Scene
-{{< hint info >}}
-If you are interested in how Kafka source works under the design of new data 
source API, you may
-want to read this part as a reference. For details about the new data source 
API,
-[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) 
and
-<a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";>FLIP-27</a>
-provide more descriptive discussions.
-{{< /hint >}}
-
-
-Under the abstraction of the new data source API, Dynamic Kafka Source 
consists of the following components:
-#### Source Split
-A source split in Dynamic Kafka Source represents a partition of a Kafka 
topic, with cluster information. It
-consists of:
-* A Kafka cluster id that can be resolved by the Kafka metadata service.
-* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
-
-You can check the class `DynamicKafkaSourceSplit` for more details.
-
-#### Split Enumerator
-
-This enumerator is responsible for discovering and assigning splits from 1+ 
cluster. At startup, the
-enumerator will discover metadata belonging to the Kafka stream ids. Using the 
metadata, it can 
-initialize KafkaSourceEnumerators to handle the functions of assigning splits 
to the readers. In addition,
-source events will be sent to the source reader to reconcile the metadata. 
This enumerator has the ability to poll the 
-KafkaMetadataService, periodically for stream discovery. In addition, 
restarting enumerators when metadata changes involve 
-clearing outdated metrics since clusters may be removed and so should their 
metrics.
-
-#### Source Reader
-
-This reader is responsible for reading from 1+ clusters and using the 
KafkaSourceReader to fetch 
-records from topics and clusters based on the metadata. When new metadata is 
discovered by the enumerator,
-the reader will reconcile metadata changes to possibly restart the 
KafkaSourceReader to read from the new 
-set of topics and clusters.
-
-#### Kafka Metadata Service
-
-This interface represents the source of truth for the current metadata for the 
configured Kafka stream ids.
-Metadata that is removed in between polls is considered non-active (e.g. 
removing a cluster from the 
-return value, means that a cluster is non-active and should not be read from). 
The cluster metadata 
-contains an immutable Kafka cluster id, the set of topics, and properties 
needed to connect to the
-Kafka cluster.
-
-#### FLIP 246
-
-To understand more behind the scenes, please read 
[FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
-for more details and discussion.
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md 
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index 903a26d6..4c6e38fc 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -1,5 +1,5 @@
 ---
-title: Kafka
+title: Dynamic Kafka
 weight: 3
 type: docs
 aliases:
@@ -26,12 +26,11 @@ under the License.
 
 # Dynamic Kafka Source _`Experimental`_
 
-Flink provides an [Apache Kafka](https://kafka.apache.org) connector for 
reading data from and 
-writing data to Kafka topics from one or more Kafka clusters. This connector 
achieves this in a dynamic 
-fashion, without requiring a job restart, using a Kafka metadata service to 
facilitate changes in 
-topics and/or clusters. This is especially useful in transparent Kafka cluster 
addition/removal without 
-Flink job restart, transparent Kafka topic addition/removal without Flink job 
restart, and direct integration
-with Hybrid Source.
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for 
reading data from Kafka topics from one or more Kafka clusters. 
+The Dynamic Kafka connector discovers the clusters and topics using a Kafka 
metadata service and can achieve reading in a dynamic fashion, facilitating 
changes in 
+topics and/or clusters, without requiring a job restart. This is especially 
useful when you need to read a new Kafka cluster/topic and/or stop reading 
+an existing Kafka cluster/topic (cluster migration/failover/other 
infrastructure changes) and when you need direct integration with Hybrid 
Source. The solution 
+makes these operations automated so that they are transparent to Kafka 
consumers.
 
 ## Dependency
 
@@ -48,7 +47,7 @@ This part describes the Dynamic Kafka Source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
 {{< /hint >}}
 
-## Usage
+### Usage
 
 Dynamic Kafka Source provides a builder class to initialize the 
DynamicKafkaSource. The code snippet 
 below shows how to build a DynamicKafkaSource to consume messages from the 
earliest offset of the 
@@ -56,22 +55,46 @@ stream "input-stream" and deserialize only the value of the
 ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the 
cluster(s) and topic(s)
 corresponding to "input-stream".
 
-{{< tabs "KafkaSource" >}}
+{{< tabs "DynamicKafkaSource" >}}
 {{< tab "Java" >}}
 ```java
 
 DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
     .setKafkaMetadataService(new MyKafkaMetadataService())
     .setStreamIds(Collections.singleton("input-stream"))
-    
.setStartingOffsets(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
-    .setDeserializer(new SimpleStringSchema())
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
     .setProperties(properties)
     .build();
 
-env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka 
Source");
 ```
 {{< /tab >}}
 {{< /tabs >}}
+The following properties are **required** for building a DynamicKafkaSource:
+
+The Kafka metadata service, configured by 
setKafkaMetadataService(KafkaMetadataService)
+The stream ids to subscribe, see the following Kafka stream subscription 
section for more details.
+Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< 
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+
+### Kafka Stream Subscription
+The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
+* A set of Kafka stream ids. For example:
+  {{< tabs "DynamicKafkaSource#setStreamIds" >}}
+  {{< tab "Java" >}}
+  ```java
+  DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b"));
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+* A regex pattern that subscribes to all Kafka stream ids that match the 
provided regex. For example:
+  {{< tabs "DynamicKafkaSource#setStreamPattern" >}}
+  {{< tab "Java" >}}
+  ```java
+  DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*"));
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 ### Kafka Metadata Service
 
@@ -85,10 +108,94 @@ for any changes to the Kafka stream(s) and reconciling the 
reader tasks to subsc
 Kafka metadata returned by the service. For example, in the case of a Kafka 
migration, the source would 
 swap from one cluster to the new cluster when the service makes that change in 
the Kafka stream metadata.
 
+### Additional Properties
+There are configuration options in DynamicKafkaSourceOptions that can be 
configured in the properties through the builder:
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>The interval in milliseconds for the source to discover the changes 
in stream metadata. A non-positive value disables the stream metadata 
discovery.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">1</td>
+      <td>Integer</td>
+      <td>The number of consecutive failures before letting the exception from 
Kafka metadata service discovery trigger jobmanager failure and global 
failover. The default is one to at least catch startup failures.</td>
+    </tr>
+    </tbody>
+</table>
+
+
+In addition to this list, see the [regular Kafka connector]({{< ref 
"docs/connectors/datastream/kafka" >}}#additional-properties) for
+a list of applicable properties.
+
+### Metrics
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 18%">Metrics</th>
+      <th class="text-left" style="width: 18%">User Variables</th>
+      <th class="text-left" style="width: 39%">Description</th>
+      <th class="text-left" style="width: 10%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <th rowspan="8">Operator</th>
+        <td>currentEmitEventTimeLag</td>
+        <td>n/a</td>
+        <td>The time span from the record event timestamp to the time the 
record is emitted by the source connector¹: <code>currentEmitEventTimeLag = 
EmitTime - EventTime.</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>watermarkLag</td>
+        <td>n/a</td>
+        <td>The time span that the watermark lags behind the wall clock time: 
<code>watermarkLag = CurrentTime - Watermark</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>sourceIdleTime</td>
+        <td>n/a</td>
+        <td>The time span that the source has not processed any record: 
<code>sourceIdleTime = CurrentTime - LastRecordProcessTime</code></td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>pendingRecords</td>
+        <td>n/a</td>
+        <td>The number of records that have not been fetched by the source. 
e.g. the available records after the consumer offset in a Kafka partition.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>kafkaClustersCount</td>
+      <td>n/a</td>
+      <td>The total number of Kafka clusters read by this reader.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+In addition to this list, see the [regular Kafka connector]({{< ref 
"docs/connectors/datastream/kafka" >}}#monitoring) for
+the KafkaSourceReader metrics that are also reported.
+
 ### Additional Details
 
 For additional details on deserialization, event time and watermarks, 
idleness, consumer offset 
-committing, security, and more, you can refer to the Kafka Source 
documentation. This is possible because the 
+committing, security, and more, you can refer to the [Kafka Source 
documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-source). 
This is possible because the 
 Dynamic Kafka Source leverages components of the Kafka Source, and the 
implementation will be 
 discussed in the next section.
 
@@ -113,7 +220,7 @@ You can check the class `DynamicKafkaSourceSplit` for more 
details.
 
 #### Split Enumerator
 
-This enumerator is responsible for discovering and assigning splits from 1+ 
cluster. At startup, the
+This enumerator is responsible for discovering and assigning splits from one 
or more clusters. At startup, the
 enumerator will discover metadata belonging to the Kafka stream ids. Using the 
metadata, it can 
 initialize KafkaSourceEnumerators to handle the functions of assigning splits 
to the readers. In addition,
 source events will be sent to the source reader to reconcile the metadata. 
This enumerator has the ability to poll the 
@@ -122,7 +229,7 @@ clearing outdated metrics since clusters may be removed and 
so should their metr
 
 #### Source Reader
 
-This reader is responsible for reading from 1+ clusters and using the 
KafkaSourceReader to fetch 
+This reader is responsible for reading from one or more clusters and using the 
KafkaSourceReader to fetch 
 records from topics and clusters based on the metadata. When new metadata is 
discovered by the enumerator,
 the reader will reconcile metadata changes to possibly restart the 
KafkaSourceReader to read from the new 
 set of topics and clusters.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
index 074798ce..bdecaf39 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceOptions.java
@@ -39,7 +39,7 @@ public class DynamicKafkaSourceOptions {
                     .longType()
                     .defaultValue(-1L)
                     .withDescription(
-                            "The interval in milliseconds for the sink to 
discover "
+                            "The interval in milliseconds for the source to 
discover "
                                     + "the changes in stream metadata. A 
non-positive value disables the stream metadata discovery.");
 
     public static final ConfigOption<Integer> 
STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD =
@@ -49,7 +49,7 @@ public class DynamicKafkaSourceOptions {
                     .withDescription(
                             "The number of consecutive failures before letting 
the exception from Kafka metadata service discovery "
                                     + "trigger jobmanager failure and global 
failover. The default is one to at least catch startup "
-                                    + "failures. This is only implemented for 
the source");
+                                    + "failures.");
 
     @Internal
     public static <T> T getOption(

Reply via email to