This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 020bc33c [FLINK-35886] Leave a note for future Flink 2.0 upgrade
020bc33c is described below

commit 020bc33c4e327437bc9f352f4b5d9b479a42568b
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Aug 29 10:32:25 2024 +0200

    [FLINK-35886] Leave a note for future Flink 2.0 upgrade
    
    Fixing FLIP-471 in the old source would require some extra work, that we 
hopefully can avoid by removing the old source before doing the upgrade.
---
 .../streaming/connectors/kafka/internals/AbstractFetcher.java     | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 841d4528..41b5ad24 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -435,6 +435,14 @@ public abstract class AbstractFetcher<T, KPH> {
                                         kafkaHandle,
                                         
deserializedWatermarkStrategy.createTimestampAssigner(
                                                 () -> consumerMetricGroup),
+                                        // When upgrading to Flink 2.0, 
context has to provide also
+                                        // the input activity clock. This is 
not trivial for the old
+                                        // sources. Ideally we should drop 
this old source before
+                                        // this connector is upgraded to Flink 
2.0. Otherwise, we
+                                        // can avoid the compilation error 
without fixing the bug
+                                        // addressed by the FLIP-471, by 
returning SystemClock,
+                                        // which would reproduce the 
pre-FLIP-471 behavior (without
+                                        // fixing the underlying bug).
                                         
deserializedWatermarkStrategy.createWatermarkGenerator(
                                                 () -> consumerMetricGroup),
                                         immediateOutput,

Reply via email to