This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch f35886
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit b08efd318ad4d1b0c6ad1f4664eb95a7699db633
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Aug 29 10:32:25 2024 +0200

    [FLINK-35886] Leave a note for future Flink 2.0 upgrade
    
    Fixing FLIP-471 in the old source would require some extra work, that we 
hopefully can avoid by removing the old source before doing the upgrade.
---
 .../streaming/connectors/kafka/internals/AbstractFetcher.java      | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 841d4528..486df891 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -435,6 +435,13 @@ public abstract class AbstractFetcher<T, KPH> {
                                         kafkaHandle,
                                         
deserializedWatermarkStrategy.createTimestampAssigner(
                                                 () -> consumerMetricGroup),
+                                        // When upgrading to Flink 2.0, 
context has to provide also
+                                        // the input activity clock. This is 
not trivial for the old
+                                        // sources. Ideally we should drop 
this old source before
+                                        // this connector is upgraded to Flink 
2.0. Otherwise, we
+                                        // can avoid the compilation error 
without fixing the
+                                        // FLIP-471 bug, by returning 
SystemClock as the input
+                                        // activity clock. Note that would be 
incorrect thing to do.
                                         
deserializedWatermarkStrategy.createWatermarkGenerator(
                                                 () -> consumerMetricGroup),
                                         immediateOutput,

Reply via email to