aljoscha commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r425659034
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -361,80 +350,37 @@ protected void emitRecordsWithTimestamps(
// timestamps will be of the same size as
records.
long timestamp = getTimestampForRecord(record,
partitionState, kafkaEventTimestamp);
sourceContext.collectWithTimestamp(record,
timestamp);
- if (timestampWatermarkMode ==
PUNCTUATED_WATERMARKS) {
- emitPunctuatedWatermark(record,
timestamp, partitionState);
- }
}
partitionState.setOffset(offset);
}
}
- private void emitPunctuatedWatermark(
- T record,
- long timestamp,
- KafkaTopicPartitionState<KPH> partitionState) {
- final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>
withWatermarksState =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T,
KPH>) partitionState;
-
- Watermark newWatermark =
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
- // if we also have a new per-partition watermark, check if that
is also a
- // new cross-partition watermark
- if (newWatermark != null) {
- updateMinPunctuatedWatermark(newWatermark);
- }
- }
-
+ // This must be called under the checkpoint lock because we potentially
emit watermarks in
+ // withWatermarkState.onRecord()
protected long getTimestampForRecord(
T record,
KafkaTopicPartitionState<KPH> partitionState,
long kafkaEventTimestamp) {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
return kafkaEventTimestamp;
- } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- final KafkaTopicPartitionStateWithPeriodicWatermarks<T,
KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
- // extract timestamp - this accesses/modifies the
per-partition state inside the
- // watermark generator instance, so we need to lock the
access on the
- // partition state. concurrent access can happen from
the periodic emitter
- //noinspection
SynchronizationOnLocalVariableOrMethodParameter
- synchronized (withWatermarksState) {
- return
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- }
} else {
- final
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
- // only one thread ever works on accessing timestamps
and watermarks
- // from the punctuated extractor
- return
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- }
- }
+ // must have a WatermarkGenerator
- /**
- *Checks whether a new per-partition watermark is also a new
cross-partition watermark.
- */
- private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
- if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
- long newMin = Long.MAX_VALUE;
+ final KafkaTopicPartitionStateWithWatermarkGenerator<T,
KPH> withWatermarksState =
+
(KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH>) partitionState;
- for (KafkaTopicPartitionState<?> state :
subscribedPartitionStates) {
- @SuppressWarnings("unchecked")
- final
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+ //noinspection
SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (withWatermarksState) {
- newMin = Math.min(newMin,
withWatermarksState.getCurrentPartitionWatermark());
- }
+ // You would expect that we don't have to do
this under lock. You would be wrong:
+ // A WatermarkStrategy can wrap an old-style
combined
+ // timestamp extractor/watermark assigner, in
which case the TimestampAssigner and
+ // WatermarkGenerator wrap one and the same
object, where extracting the timestamp
+ // updates the internal state of the assigner.
+ long timestamp =
withWatermarksState.extractTimestamp(record, kafkaEventTimestamp);
- // double-check locking pattern
- if (newMin > maxWatermarkSoFar) {
- synchronized (checkpointLock) {
- if (newMin > maxWatermarkSoFar) {
- maxWatermarkSoFar = newMin;
- sourceContext.emitWatermark(new
Watermark(newMin));
- }
- }
+ withWatermarksState.onEvent(record, timestamp);
Review comment:
Oh boy, you're right. But that makes it a bit more tricky because we
can't do it in this method and have to get the lock again. 👌
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]