aljoscha commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r426165759
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -350,92 +339,38 @@ protected abstract void doCommitInternalOffsetsToKafka(
*/
protected void emitRecordsWithTimestamps(
Queue<T> records,
- KafkaTopicPartitionState<KPH> partitionState,
+ KafkaTopicPartitionState<T, KPH> partitionState,
long offset,
long kafkaEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
- // timestamps will be of the same size as
records.
- long timestamp = getTimestampForRecord(record,
partitionState, kafkaEventTimestamp);
- sourceContext.collectWithTimestamp(record,
timestamp);
- if (timestampWatermarkMode ==
PUNCTUATED_WATERMARKS) {
- emitPunctuatedWatermark(record,
timestamp, partitionState);
- }
- }
- partitionState.setOffset(offset);
- }
- }
-
- private void emitPunctuatedWatermark(
- T record,
- long timestamp,
- KafkaTopicPartitionState<KPH> partitionState) {
- final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>
withWatermarksState =
- (KafkaTopicPartitionStateWithPunctuatedWatermarks<T,
KPH>) partitionState;
-
- Watermark newWatermark =
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
- // if we also have a new per-partition watermark, check if that
is also a
- // new cross-partition watermark
- if (newWatermark != null) {
- updateMinPunctuatedWatermark(newWatermark);
- }
- }
-
- protected long getTimestampForRecord(
- T record,
- KafkaTopicPartitionState<KPH> partitionState,
- long kafkaEventTimestamp) {
- if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
- return kafkaEventTimestamp;
- } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- final KafkaTopicPartitionStateWithPeriodicWatermarks<T,
KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
- // extract timestamp - this accesses/modifies the
per-partition state inside the
- // watermark generator instance, so we need to lock the
access on the
- // partition state. concurrent access can happen from
the periodic emitter
- //noinspection
SynchronizationOnLocalVariableOrMethodParameter
- synchronized (withWatermarksState) {
- return
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- }
- } else {
- final
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
- // only one thread ever works on accessing timestamps
and watermarks
- // from the punctuated extractor
- return
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
- }
- }
+ long timestamp;
- /**
- *Checks whether a new per-partition watermark is also a new
cross-partition watermark.
- */
- private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
- if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
- long newMin = Long.MAX_VALUE;
-
- for (KafkaTopicPartitionState<?> state :
subscribedPartitionStates) {
- @SuppressWarnings("unchecked")
- final
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-
- newMin = Math.min(newMin,
withWatermarksState.getCurrentPartitionWatermark());
- }
+ //noinspection
SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (partitionState) {
+
+ // You would expect that we don't have
to do this under lock. You would be wrong:
+ // A WatermarkStrategy can wrap an
old-style combined
+ // timestamp extractor/watermark
assigner, in which case the TimestampAssigner and
+ // WatermarkGenerator wrap one and the
same object, where extracting the timestamp
+ // updates the internal state of the
assigner.
+ timestamp =
partitionState.extractTimestamp(record, kafkaEventTimestamp);
+ partitionState.onEvent(record,
timestamp);
Review comment:
Ah, I forgot to remove it. I had already added the call after element
emission... 😅
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]