This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 678370b18e1 [FLINK-28853] Document the split level watermark alignment 
feature and fixup grammar from the configuration table
678370b18e1 is described below

commit 678370b18e1b6c4a23e5ce08f8efd05675a0cc17
Author: mas-chen <mas.c...@berkeley.edu>
AuthorDate: Thu May 4 23:48:11 2023 -0700

    [FLINK-28853] Document the split level watermark alignment feature and 
fixup grammar from the configuration table
---
 .../datastream/event-time/generating_watermarks.md | 22 ++++++++++++----------
 docs/content/docs/dev/datastream/sources.md        |  6 ++++++
 .../generated/pipeline_configuration.html          |  2 +-
 .../flink/configuration/PipelineOptions.java       |  4 ++--
 4 files changed, 21 insertions(+), 13 deletions(-)

diff --git 
a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md 
b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
index 88c8488ba8c..35bbd147127 100644
--- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
+++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md
@@ -304,19 +304,21 @@ other sources/tasks which can move the combined watermark 
forward and that way u
 one.
 
 {{< hint warning >}}
-**Note:** As of 1.15, Flink supports aligning across tasks of the same source 
and/or different
-sources. It does not support aligning splits/partitions/shards in the same 
task.
+**Note:** As of Flink 1.17, split level watermark alignment is supported by 
the FLIP-27 source framework. 
+Source connectors have to implement an interface to resume and pause splits so 
that splits/partitions/shards 
+can be aligned in the same task. More detail on the pause and resume 
interfaces can found in the [Source API]({{< ref "docs/dev/datastream/sources" 
>}}#split-level-watermark-alignment).
 
-In a case where there are e.g. two Kafka partitions that produce watermarks at 
different pace, that
-get assigned to the same task watermark might not behave as expected. 
Fortunately, worst case it
-should not perform worse than without alignment.
+If you are upgrading from a Flink version between 1.15.x and 1.16.x inclusive, 
you can disable split level alignment by setting
+`pipeline.watermark-alignment.allow-unaligned-source-splits` to true. 
Moreover, you can tell if your source supports split level alignment
+by checking if it throws an `UnsupportedOperationException` at runtime or by 
reading the javadocs. In this case, it would be desirable to 
+to disable split level watermark alignment to avoid fatal exceptions.
 
-Given the limitation above, we suggest applying watermark alignment in two 
situations:
-
-1. You have two different sources (e.g. Kafka and File) that produce 
watermarks at different speeds
-2. You run your source with parallelism equal to the number of 
splits/shards/partitions, which
-   results in every subtask being assigned a single unit of work.
+When setting the flag to true, watermark alignment will be only working 
properly when the number of splits/shards/partitions is equal to the
+parallelism of the source operator. This results in every subtask being 
assigned a single unit of work. On the other hand, if there are two Kafka 
partitions, which produce watermarks at different paces and
+get assigned to the same task, then watermarks might not behave as expected. 
Fortunately, even in the worst case, the basic alignment should not perform 
worse than having no alignment at all.
 
+Furthermore, Flink also supports aligning across tasks of the same sources 
and/or different
+sources, which is useful when you have two different sources (e.g. Kafka and 
File) that produce watermarks at different speeds.
 {{< /hint >}}
 
 ## Writing WatermarkGenerators
diff --git a/docs/content/docs/dev/datastream/sources.md 
b/docs/content/docs/dev/datastream/sources.md
index 020738cdbb1..0b755dc10aa 100644
--- a/docs/content/docs/dev/datastream/sources.md
+++ b/docs/content/docs/dev/datastream/sources.md
@@ -428,3 +428,9 @@ The data source API supports running watermark generators 
individually *per spli
 When implementing a source connector using the *Split Reader API*, this is 
automatically handled. All implementations based on the Split Reader API have 
split-aware watermarks out-of-the-box.
 
 For an implementation of the lower level `SourceReader` API to use split-aware 
watermark generation, the implementation must output events from different 
splits to different outputs: the *Split-local SourceOutputs*. Split-local 
outputs can be created and released on the main {{< gh_link 
file="flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java"
 name="ReaderOutput" >}} via the `createOutputForSplit(splitId)` and 
`releaseOutputForSplit(splitId)` methods. Please [...]
+
+#### Split Level Watermark Alignment
+
+Although source operator watermark alignment is handled by Flink runtime, the 
source needs to additionally implement `SourceReader#pauseOrResumeSplits` and 
`SplitReader#pauseOrResumeSplits` to achieve split level watermark alignment. 
Split level watermark alignment is useful for when 
+there are multiple splits assigned to a source reader. By default, these 
implementations will throw an `UnsupportedOperationException`, 
`pipeline.watermark-alignment.allow-unaligned-source-splits` is set to false, 
when there is more than one split assigned, and the split exceeds the watermark 
alignment threshold configured by the `WatermarkStrategy`. `SourceReaderBase`
+contains an implementation for `SourceReader#pauseOrResumeSplits` so that 
inheriting sources only need to implement `SplitReader#pauseOrResumeSplits`. 
See the javadocs for more implementation hints.
diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html 
b/docs/layouts/shortcodes/generated/pipeline_configuration.html
index d42329f6f9d..f2b8b660f5a 100644
--- a/docs/layouts/shortcodes/generated/pipeline_configuration.html
+++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html
@@ -138,7 +138,7 @@
             
<td><h5>pipeline.watermark-alignment.allow-unaligned-source-splits</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If watermark alignment is used, sources with multiple splits 
will attempt to pause/resume split readers to avoid watermark drift of source 
splits. However, if split readers don't support pause/resume an 
UnsupportedOperationException will be thrown when there is an attempt to 
pause/resume. To allow use of split readers that don't support pause/resume 
and, hence, t allow unaligned splits while still using watermark alignment, set 
this parameter to true. The default value is [...]
+            <td>If watermark alignment is used, sources with multiple splits 
will attempt to pause/resume split readers to avoid watermark drift of source 
splits. However, if split readers don't support pause/resume, an 
UnsupportedOperationException will be thrown when there is an attempt to 
pause/resume. To allow use of split readers that don't support pause/resume 
and, hence, to allow unaligned splits while still using watermark alignment, 
set this parameter to true. The default value  [...]
         </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 4c739e98426..f8bacdf494b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -303,10 +303,10 @@ public class PipelineOptions {
                             "If watermark alignment is used, sources with 
multiple splits will "
                                     + "attempt to pause/resume split readers 
to avoid watermark "
                                     + "drift of source splits. "
-                                    + "However, if split readers don't support 
pause/resume an "
+                                    + "However, if split readers don't support 
pause/resume, an "
                                     + "UnsupportedOperationException will be 
thrown when there is "
                                     + "an attempt to pause/resume. To allow 
use of split readers that "
-                                    + "don't support pause/resume and, hence, 
t allow unaligned splits "
+                                    + "don't support pause/resume and, hence, 
to allow unaligned splits "
                                     + "while still using watermark alignment, 
set this parameter to true. "
                                     + "The default value is false. Note: This 
parameter may be "
                                     + "removed in future releases.");

Reply via email to