aljoscha commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r425659034



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -361,80 +350,37 @@ protected void emitRecordsWithTimestamps(
                                // timestamps will be of the same size as 
records.
                                long timestamp = getTimestampForRecord(record, 
partitionState, kafkaEventTimestamp);
                                sourceContext.collectWithTimestamp(record, 
timestamp);
-                               if (timestampWatermarkMode == 
PUNCTUATED_WATERMARKS) {
-                                       emitPunctuatedWatermark(record, 
timestamp, partitionState);
-                               }
                        }
                        partitionState.setOffset(offset);
                }
        }
 
-       private void emitPunctuatedWatermark(
-                       T record,
-                       long timestamp,
-                       KafkaTopicPartitionState<KPH> partitionState) {
-               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
-                       (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, 
KPH>) partitionState;
-
-               Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
-               // if we also have a new per-partition watermark, check if that 
is also a
-               // new cross-partition watermark
-               if (newWatermark != null) {
-                       updateMinPunctuatedWatermark(newWatermark);
-               }
-       }
-
+       // This must be called under the checkpoint lock because we potentially 
emit watermarks in
+       // withWatermarkState.onRecord()
        protected long getTimestampForRecord(
                        T record,
                        KafkaTopicPartitionState<KPH> partitionState,
                        long kafkaEventTimestamp) {
                if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
                        return kafkaEventTimestamp;
-               } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       final KafkaTopicPartitionStateWithPeriodicWatermarks<T, 
KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
-                       // extract timestamp - this accesses/modifies the 
per-partition state inside the
-                       // watermark generator instance, so we need to lock the 
access on the
-                       // partition state. concurrent access can happen from 
the periodic emitter
-                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                       synchronized (withWatermarksState) {
-                               return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-                       }
                } else {
-                       final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
-                       // only one thread ever works on accessing timestamps 
and watermarks
-                       // from the punctuated extractor
-                       return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-               }
-       }
+                       // must have a WatermarkGenerator
 
-       /**
-        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
-        */
-       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
-               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
-                       long newMin = Long.MAX_VALUE;
+                       final KafkaTopicPartitionStateWithWatermarkGenerator<T, 
KPH> withWatermarksState =
+                               
(KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH>) partitionState;
 
-                       for (KafkaTopicPartitionState<?> state : 
subscribedPartitionStates) {
-                               @SuppressWarnings("unchecked")
-                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (withWatermarksState) {
 
-                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
-                       }
+                               // You would expect that we don't have to do 
this under lock. You would be wrong:
+                               // A WatermarkStrategy can wrap an old-style 
combined
+                               // timestamp extractor/watermark assigner, in 
which case the TimestampAssigner and
+                               // WatermarkGenerator wrap one and the same 
object, where extracting the timestamp
+                               // updates the internal state of the assigner.
+                               long timestamp = 
withWatermarksState.extractTimestamp(record, kafkaEventTimestamp);
 
-                       // double-check locking pattern
-                       if (newMin > maxWatermarkSoFar) {
-                               synchronized (checkpointLock) {
-                                       if (newMin > maxWatermarkSoFar) {
-                                               maxWatermarkSoFar = newMin;
-                                               sourceContext.emitWatermark(new 
Watermark(newMin));
-                                       }
-                               }
+                               withWatermarksState.onEvent(record, timestamp);

Review comment:
       Oh boy, you're right. But that makes it a bit more tricky because we 
can't do it in this method and have to get the lock again. 👌




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to