yunfengzhou-hub commented on code in PR #23521: URL: https://github.com/apache/flink/pull/23521#discussion_r1361503174
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.util.List; + +/** The builder class for {@link RecordAttributes}. */ +@Experimental +public class RecordAttributesBuilder { + private final List<RecordAttributes> lastRecordAttributesOfInputs; + @Nullable private Boolean backlog = null; + + /** + * This constructor takes a list of the last recordAttributes received from each of the + * operator's inputs. When this list is not empty, it will be used to determine the default + * values for those attributes that have not been explicitly set by caller. + */ + public RecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) { + this.lastRecordAttributesOfInputs = lastRecordAttributesOfInputs; + } + + public RecordAttributesBuilder setBacklog(boolean backlog) { + this.backlog = backlog; + return this; + } + + /** + * If any operator attribute is null, we will log it at DEBUG level and determine a non-null + * default value as described below. + * + * <p>Default value for backlog: if any element in lastRecordAttributesOfInputs has + * backlog=true, use true. Otherwise, use false. + */ + public RecordAttributes build() { + if (backlog == null) { + backlog = getDefaultBacklog(lastRecordAttributesOfInputs); Review Comment: Let's add DEBUG level log according to the JavaDoc. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributesBuilder.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Experimental; + +import javax.annotation.Nullable; + +import java.util.List; + +/** The builder class for {@link RecordAttributes}. */ +@Experimental +public class RecordAttributesBuilder { + private final List<RecordAttributes> lastRecordAttributesOfInputs; + @Nullable private Boolean backlog = null; + + /** + * This constructor takes a list of the last recordAttributes received from each of the + * operator's inputs. When this list is not empty, it will be used to determine the default + * values for those attributes that have not been explicitly set by caller. + */ + public RecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) { + this.lastRecordAttributesOfInputs = lastRecordAttributesOfInputs; + } + + public RecordAttributesBuilder setBacklog(boolean backlog) { + this.backlog = backlog; + return this; + } + + /** + * If any operator attribute is null, we will log it at DEBUG level and determine a non-null + * default value as described below. + * + * <p>Default value for backlog: if any element in lastRecordAttributesOfInputs has + * backlog=true, use true. Otherwise, use false. + */ + public RecordAttributes build() { + if (backlog == null) { + backlog = getDefaultBacklog(lastRecordAttributesOfInputs); + } + return new RecordAttributes(backlog); + } + + private boolean getDefaultBacklog(List<RecordAttributes> lastRecordAttributesOfInputs) { Review Comment: It might be better to either make this method static or reuse the `lastRecordAttributesOfInputs` variable in the builder class. ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsBacklogEvent.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** A source event that notify the source of the backlog status. */ +public class IsBacklogEvent implements OperatorEvent { Review Comment: nit: it might be better to rename it as `BacklogEvent`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java: ########## @@ -649,4 +659,52 @@ public OperatorID getOperatorID() { protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() { return Optional.ofNullable(timeServiceManager); } + + @Experimental + public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception { + lastRecordAttributes1 = recordAttributes; + if (timeServiceManager != null + && timeServiceManager instanceof InternalBacklogAwareTimerServiceManagerImpl) { + final InternalBacklogAwareTimerServiceManagerImpl<?> backlogAwareTimerServiceManager = + (InternalBacklogAwareTimerServiceManagerImpl<?>) timeServiceManager; + if (recordAttributes instanceof InternalRecordAttributes) { + backlogAwareTimerServiceManager.setMaxWatermarkDuringBacklog( + ((InternalRecordAttributes) recordAttributes) + .getMaxWatermarkDuringBacklog()); + } + backlogAwareTimerServiceManager.setBacklog(recordAttributes.isBacklog()); + } + output.emitRecordAttributes( + new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build()); + } + + @Experimental + public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception { + lastRecordAttributes1 = recordAttributes; + List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes(); + output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build()); + } + + @Experimental + public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception { + lastRecordAttributes2 = recordAttributes; + List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes(); + output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build()); + } + + private List<RecordAttributes> getTwoInputsLastRecordAttributes() { + List<RecordAttributes> lastRecordAttributes; + if (lastRecordAttributes1 == null && lastRecordAttributes2 == null) { + // should not reach here. + throw new RuntimeException( + "lastRecordAttributes1 and lastRecordAttributes2 cannot be both null."); Review Comment: nit: It might be better to move this condition to the last of these conditions to reduce the average code path. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceImpl.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.function.BiConsumerWithException; + +/** + * InternalBacklogAwareTimerServiceImpl uses {@link InternalTimerServiceImpl} for event time timers + * during non-backlog processing and uses {@link BacklogTimeService} for event time timers during + * backlog processing. All the processing time timers are managed by the {@link + * InternalTimerServiceImpl}. + */ +@Internal +public class InternalBacklogAwareTimerServiceImpl<K, N> implements InternalTimerService<N> { + + private final InternalTimerServiceImpl<K, N> realTimeInternalTimeService; + private final BacklogTimeService<K, N> backlogTimeService; + private InternalTimerService<N> currentInternalTimerService; + + public InternalBacklogAwareTimerServiceImpl( + InternalTimerServiceImpl<K, N> realTimeInternalTimeService, + BacklogTimeService<K, N> backlogTimeService) { + this.realTimeInternalTimeService = realTimeInternalTimeService; + this.backlogTimeService = backlogTimeService; + this.currentInternalTimerService = realTimeInternalTimeService; + } + + @Override + public long currentProcessingTime() { + return realTimeInternalTimeService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentInternalTimerService.currentWatermark(); + } + + @Override + public void registerProcessingTimeTimer(N namespace, long time) { + realTimeInternalTimeService.registerProcessingTimeTimer(namespace, time); + } + + @Override + public void deleteProcessingTimeTimer(N namespace, long time) { + realTimeInternalTimeService.deleteProcessingTimeTimer(namespace, time); + } + + @Override + public void registerEventTimeTimer(N namespace, long time) { + currentInternalTimerService.registerEventTimeTimer(namespace, time); + } + + @Override + public void deleteEventTimeTimer(N namespace, long time) { + currentInternalTimerService.deleteEventTimeTimer(namespace, time); + } + + @Override + public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) + throws Exception { + currentInternalTimerService.forEachEventTimeTimer(consumer); + } + + @Override + public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) + throws Exception { + realTimeInternalTimeService.forEachProcessingTimeTimer(consumer); + } + + public void advanceWatermark(long timestamp) throws Exception { + realTimeInternalTimeService.advanceWatermark(timestamp); + } + + public void setMaxWatermarkDuringBacklog(long timestamp) { + backlogTimeService.setMaxWatermarkDuringBacklog(timestamp); + } + + public void setBacklog(boolean backlog) throws Exception { + if (currentInternalTimerService == backlogTimeService && !backlog) { + // Switch to non backlog + backlogTimeService.setCurrentKey(null); + currentInternalTimerService = realTimeInternalTimeService; + return; Review Comment: It might be better to advance realTimeInternalTimeService's watermark to maxWatermarkDuringBacklog. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingRuntimeException; + +import java.util.HashMap; +import java.util.Map; + +/** + * InternalBacklogAwareTimerServiceManagerImpl keeps track of all the {@link + * InternalBacklogAwareTimerServiceImpl}. + */ +@Internal +public class InternalBacklogAwareTimerServiceManagerImpl<K> + extends InternalTimeServiceManagerImpl<K> + implements InternalTimeServiceManager<K>, KeyedStateBackend.KeySelectionListener<K> { + + private final Map<String, InternalBacklogAwareTimerServiceImpl<K, ?>> timerServices = + new HashMap<>(); + + private boolean backlog = false; + + InternalBacklogAwareTimerServiceManagerImpl( + KeyGroupRange localKeyGroupRange, + KeyContext keyContext, + PriorityQueueSetFactory priorityQueueSetFactory, + ProcessingTimeService processingTimeService, + StreamTaskCancellationContext cancellationContext) { + super( + localKeyGroupRange, + keyContext, + priorityQueueSetFactory, + processingTimeService, + cancellationContext); + } + + @Override + public <N> InternalTimerService<N> getInternalTimerService( + String name, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + Triggerable<K, N> triggerable) { + + final InternalTimerServiceImpl<K, N> internalTimerService = + (InternalTimerServiceImpl<K, N>) + super.getInternalTimerService( + name, keySerializer, namespaceSerializer, triggerable); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue = + internalTimerService.getEventTimeTimersQueue(); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> + processingTimeTimersQueue = internalTimerService.getProcessingTimeTimersQueue(); + + final BacklogTimeService<K, N> backlogTimeService = + new BacklogTimeService<>( + processingTimeService, + triggerable, + eventTimeTimersQueue, + processingTimeTimersQueue); + + InternalBacklogAwareTimerServiceImpl<K, N> timerService = + (InternalBacklogAwareTimerServiceImpl<K, N>) timerServices.get(name); + if (timerService == null) { Review Comment: It might be better to move the logics above this line to below this line. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingBacklogDataInput.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * SortingBacklogDataInput forwards the input records to output util it switch to backlog + * processing. During backlog processing, it buffers and sorts the input records and outputs the + * sorted records to the downstream when switching to non backlog processing. + */ +@Internal +public class SortingBacklogDataInput<T, K> implements StreamTaskInput<T> { + + private static final Logger LOG = LoggerFactory.getLogger(SortingBacklogDataInput.class); + + private final StreamTaskInput<T> wrappedInput; + private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter; + private final KeySelector<T, K> keySelector; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingDataOutput sortingDataOutput; + private final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null; + private long watermarkSeenDuringBacklog = Long.MIN_VALUE; + + private volatile OperatingMode mode = OperatingMode.PROCESSING_REALTIME; + + private enum OperatingMode { + PROCESSING_REALTIME, + SORTING_BACKLOG, + FLUSHING_BACKLOG Review Comment: It could help improve readability to add brief JavaDocs to these modes and how they convert to each other. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BacklogTimeServiceTest.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createBacklogTimerService; +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createTimerQueue; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BacklogTimeService}. */ +public class BacklogTimeServiceTest { + + @Test + public void testTriggerEventTimeTimer() throws Exception { + List<Long> timers = new ArrayList<>(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + KeyGroupRange testKeyGroupRange = new KeyGroupRange(0, 1); + final HeapPriorityQueueSetFactory priorityQueueSetFactory = + new HeapPriorityQueueSetFactory(testKeyGroupRange, 1, 128); + + final TimerSerializer<Integer, String> timerSerializer = + new TimerSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE); + final BacklogTimeService<Integer, String> timeService = + createBacklogTimerService( + processingTimeService, + TestTriggerable.eventTimeTrigger( + (timer) -> timers.add(timer.getTimestamp())), + createTimerQueue( + "processingTimerQueue", timerSerializer, priorityQueueSetFactory), + createTimerQueue( + "eventTimerQueue", timerSerializer, priorityQueueSetFactory)); + + timeService.setCurrentKey(1); + timeService.registerEventTimeTimer("a", 0); + timeService.registerEventTimeTimer("a", 2); + timeService.registerEventTimeTimer("a", 1); + timeService.setMaxWatermarkDuringBacklog(2); Review Comment: nit: Let's also register a timer whose timestamp is larger than max watermark, and verify that this timer will not be emitted when current key changes. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortingBacklogDataInputTest.java: ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +/** Tests for {@link SortingBacklogDataInput}. */ +public class SortingBacklogDataInputTest { + @Test + public void sortingDuringBacklog() throws Exception { + CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>(); + CollectionDataInput<Integer> input = + new CollectionDataInput<>( + Arrays.asList( + new StreamRecord<>(2, 0), + new StreamRecord<>(1, 0), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(true) + .build(), + new StreamRecord<>(1, 3), + new StreamRecord<>(1, 1), + new StreamRecord<>(2, 1), + new StreamRecord<>(2, 3), + new StreamRecord<>(1, 2), + new StreamRecord<>(2, 2), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(false) + .build(), + new StreamRecord<>(1, 4), + new StreamRecord<>(2, 4), + new StreamRecord<>(1, 5))); + MockEnvironment environment = MockEnvironment.builder().build(); + SortingBacklogDataInput<Integer, Integer> sortingDataInput = + new SortingBacklogDataInput<>( + input, + new IntSerializer(), + new IntSerializer(), + (KeySelector<Integer, Integer>) value -> value, + environment.getMemoryManager(), + environment.getIOManager(), + true, + 1.0, + new Configuration(), + new DummyInvokable(), + new ExecutionConfig(), + () -> true); + + DataInputStatus inputStatus; + do { + inputStatus = sortingDataInput.emitNext(collectingDataOutput); + } while (inputStatus != DataInputStatus.END_OF_INPUT); + + org.assertj.core.api.Assertions.assertThat(collectingDataOutput.events) + .containsExactly( + new StreamRecord<>(2, 0), + new StreamRecord<>(1, 0), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(true) + .build(), + new InternalRecordAttributes(true, Long.MIN_VALUE), + new StreamRecord<>(1, 1), + new StreamRecord<>(1, 2), + new StreamRecord<>(1, 3), + new StreamRecord<>(2, 1), + new StreamRecord<>(2, 2), + new StreamRecord<>(2, 3), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(false) + .build(), + new StreamRecord<>(1, 4), + new StreamRecord<>(2, 4), + new StreamRecord<>(1, 5)); + } + + @Test + public void watermarkPropagation() throws Exception { + CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>(); + CollectionDataInput<Integer> input = + new CollectionDataInput<>( + Arrays.asList( + new StreamRecord<>(1, 3), + new Watermark(1), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(true) + .build(), + new StreamRecord<>(1, 1), + new Watermark(2), + new StreamRecord<>(2, 1), + new Watermark(3), + new StreamRecord<>(2, 3), + new Watermark(4), + new StreamRecord<>(1, 2), + new Watermark(5), + new StreamRecord<>(2, 2), + new Watermark(6), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(false) + .build())); + MockEnvironment environment = MockEnvironment.builder().build(); + SortingBacklogDataInput<Integer, Integer> sortingDataInput = + new SortingBacklogDataInput<>( + input, + new IntSerializer(), + new IntSerializer(), + (KeySelector<Integer, Integer>) value -> value, + environment.getMemoryManager(), + environment.getIOManager(), + true, + 1.0, + new Configuration(), + new DummyInvokable(), + new ExecutionConfig(), + () -> true); + + DataInputStatus inputStatus; + do { + inputStatus = sortingDataInput.emitNext(collectingDataOutput); + } while (inputStatus != DataInputStatus.END_OF_INPUT); + + org.assertj.core.api.Assertions.assertThat(collectingDataOutput.events) + .containsExactly( + new StreamRecord<>(1, 3), + new Watermark(1), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(true) + .build(), + new InternalRecordAttributes(true, 6L), + new StreamRecord<>(1, 1), + new StreamRecord<>(1, 2), + new StreamRecord<>(2, 1), + new StreamRecord<>(2, 2), + new StreamRecord<>(2, 3), + new RecordAttributesBuilder(Collections.emptyList()) + .setBacklog(false) + .build(), + new Watermark(6)); + } + + @Test + public void simpleVariableLengthKeySorting() throws Exception { Review Comment: What does this test case verify in addition to other test cases? ########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StreamingWithBacklogITCase.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.hybrid.HybridSource; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; + +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration test for streaming job with backlog. */ +public class StreamingWithBacklogITCase { + @Test + void testKeyedAggregationWithBacklog() throws Exception { + final Configuration config = new Configuration(); + config.set(CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ZERO); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setParallelism(1); Review Comment: It might be better not to require users to set parallelism to 1 if they want to use this functionality. ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/event/IsBacklogEvent.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** A source event that notify the source of the backlog status. */ Review Comment: nit: "the source" -> "the source operator" ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/InternalRecordAttributes.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** The RecordAttributes that contains extra information to be used internally. */ +@Internal +public class InternalRecordAttributes extends RecordAttributes { + private final long maxWatermarkDuringBacklog; + + public InternalRecordAttributes(boolean backlog, long backlogWatermark) { Review Comment: It might be better to rename `backlogWatermark` to `maxWatermarkDuringBacklog`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/InternalRecordAttributes.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** The RecordAttributes that contains extra information to be used internally. */ +@Internal +public class InternalRecordAttributes extends RecordAttributes { + private final long maxWatermarkDuringBacklog; + + public InternalRecordAttributes(boolean backlog, long backlogWatermark) { + super(backlog); + this.maxWatermarkDuringBacklog = backlogWatermark; + } + + public long getMaxWatermarkDuringBacklog() { + return maxWatermarkDuringBacklog; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + InternalRecordAttributes that = (InternalRecordAttributes) o; + return maxWatermarkDuringBacklog == that.maxWatermarkDuringBacklog; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), maxWatermarkDuringBacklog); + } + + @Override + public String toString() { + return "InternalRecordAttributes{" + + "watermarkDuringBacklog=" + + maxWatermarkDuringBacklog Review Comment: Let's add backlog's value to the string representation as well. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BacklogTimeService.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import java.util.LinkedList; +import java.util.List; + +/** + * An implementation of a {@link InternalTimerService} that manages timers with a single active key + * at a time. This is used by {@link + * org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceImpl} during backlog + * processing. + */ +@Internal +public class BacklogTimeService<K, N> extends BatchExecutionInternalTimeService<K, N> { + private long maxWatermarkDuringBacklog; + + public BacklogTimeService( + ProcessingTimeService processingTimeService, + Triggerable<K, N> triggerTarget, + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue, + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> + processingTimeTimersQueue) { + super( + processingTimeService, + triggerTarget, + eventTimeTimersQueue, + processingTimeTimersQueue); + } + + @Override + public void registerProcessingTimeTimer(N namespace, long time) { Review Comment: Let's override `deleteProcessingTimeTimer` as well. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java: ########## Review Comment: We should also add RecordAttributes serialization process in `StreamElement deserialize(StreamElement reuse, DataInputView source)` and `copy(DataInputView source, DataOutputView target)`. Let's also add tests for RecordAttributes in `StreamElementSerializerTest`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -569,6 +571,12 @@ public void handleOperatorEvent(OperatorEvent event) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { sourceReader.notifyNoMoreSplits(); + } else if (event instanceof IsBacklogEvent) { + eventTimeLogic.triggerPeriodicEmit(System.currentTimeMillis()); Review Comment: Should we only invoke this method when `event.isBacklog() == true`, and cancel the previously scheduled periodicEmit in `eventTimeLogic.startPeriodicWatermarkEmits`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingRuntimeException; + +import java.util.HashMap; +import java.util.Map; + +/** + * InternalBacklogAwareTimerServiceManagerImpl keeps track of all the {@link + * InternalBacklogAwareTimerServiceImpl}. + */ +@Internal +public class InternalBacklogAwareTimerServiceManagerImpl<K> + extends InternalTimeServiceManagerImpl<K> + implements InternalTimeServiceManager<K>, KeyedStateBackend.KeySelectionListener<K> { + + private final Map<String, InternalBacklogAwareTimerServiceImpl<K, ?>> timerServices = + new HashMap<>(); + + private boolean backlog = false; + + InternalBacklogAwareTimerServiceManagerImpl( + KeyGroupRange localKeyGroupRange, + KeyContext keyContext, + PriorityQueueSetFactory priorityQueueSetFactory, + ProcessingTimeService processingTimeService, + StreamTaskCancellationContext cancellationContext) { + super( + localKeyGroupRange, + keyContext, + priorityQueueSetFactory, + processingTimeService, + cancellationContext); + } + + @Override + public <N> InternalTimerService<N> getInternalTimerService( + String name, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + Triggerable<K, N> triggerable) { + + final InternalTimerServiceImpl<K, N> internalTimerService = + (InternalTimerServiceImpl<K, N>) + super.getInternalTimerService( + name, keySerializer, namespaceSerializer, triggerable); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue = + internalTimerService.getEventTimeTimersQueue(); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> + processingTimeTimersQueue = internalTimerService.getProcessingTimeTimersQueue(); + + final BacklogTimeService<K, N> backlogTimeService = + new BacklogTimeService<>( + processingTimeService, + triggerable, + eventTimeTimersQueue, + processingTimeTimersQueue); Review Comment: Would it be simpler to pass null or an empty queue as `processingTimeTimersQueue`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** The builder class for {@link OperatorAttributes}. */ +@Experimental +public class OperatorAttributesBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(OperatorAttributesBuilder.class); + + @Nullable private Boolean internalSorterSupported = null; + + public OperatorAttributesBuilder() {} + + public OperatorAttributesBuilder setInternalSorterSupported(boolean internalSorterSupported) { Review Comment: This method seems unused yet. Let's invoke this method in the corresponding `getOperatorAttributes` method in KeyedCoProcessOperator and IntervalJoinOperator according to the FLIP's description. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java: ########## @@ -795,6 +795,8 @@ public enum InputRequirement { */ SORTED, + SORTED_DURING_BACKLOG, Review Comment: Let's add a JavaDoc for this enum value, like that in `SORTED` and `PASS_THROUGH`. Besides, it might be better to add newly introduced enum values to the end of a enum class, otherwise the ordinal of enum values after the newly introduced one (PASS_THROUGH -> 1) might be changed (PASS_THROUGH -> 2). ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java: ########## @@ -649,4 +659,52 @@ public OperatorID getOperatorID() { protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() { return Optional.ofNullable(timeServiceManager); } + + @Experimental + public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception { + lastRecordAttributes1 = recordAttributes; + if (timeServiceManager != null + && timeServiceManager instanceof InternalBacklogAwareTimerServiceManagerImpl) { + final InternalBacklogAwareTimerServiceManagerImpl<?> backlogAwareTimerServiceManager = + (InternalBacklogAwareTimerServiceManagerImpl<?>) timeServiceManager; + if (recordAttributes instanceof InternalRecordAttributes) { + backlogAwareTimerServiceManager.setMaxWatermarkDuringBacklog( + ((InternalRecordAttributes) recordAttributes) + .getMaxWatermarkDuringBacklog()); + } + backlogAwareTimerServiceManager.setBacklog(recordAttributes.isBacklog()); + } + output.emitRecordAttributes( + new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build()); + } + + @Experimental + public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception { + lastRecordAttributes1 = recordAttributes; + List<RecordAttributes> lastRecordAttributes = getTwoInputsLastRecordAttributes(); + output.emitRecordAttributes(new RecordAttributesBuilder(lastRecordAttributes).build()); Review Comment: Should we invoke methods like `backlogAwareTimerServiceManager.setBacklog`, like in `processRecordAttributes`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java: ########## @@ -64,7 +64,8 @@ protected Collection<Integer> translateForBatchInternal( IntStream.range(0, inputs.size()) .mapToObj( idx -> { - if (keySelectors.get(idx) != null) { + if (keySelectors.get(idx) != null + && !transformation.isInternalSorterSupported()) { return StreamConfig.InputRequirement.SORTED; } else { Review Comment: Should we support a third condition here? ``` else if (keySelectors.get(idx) != null && transformation.isInternalSorterSupported()) { return StreamConfig.InputRequirement.SORTED_DURING_BACKLOG; } ``` ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceImplTest.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createBacklogTimerService; +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createInternalTimerService; +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createTimerQueue; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link InternalBacklogAwareTimerServiceImpl}. */ +class InternalBacklogAwareTimerServiceImplTest { Review Comment: nit: public class ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingBacklogDataInput.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * SortingBacklogDataInput forwards the input records to output util it switch to backlog + * processing. During backlog processing, it buffers and sorts the input records and outputs the + * sorted records to the downstream when switching to non backlog processing. + */ +@Internal +public class SortingBacklogDataInput<T, K> implements StreamTaskInput<T> { + + private static final Logger LOG = LoggerFactory.getLogger(SortingBacklogDataInput.class); + + private final StreamTaskInput<T> wrappedInput; + private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter; + private final KeySelector<T, K> keySelector; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingDataOutput sortingDataOutput; + private final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null; + private long watermarkSeenDuringBacklog = Long.MIN_VALUE; + + private volatile OperatingMode mode = OperatingMode.PROCESSING_REALTIME; + + private enum OperatingMode { + PROCESSING_REALTIME, + SORTING_BACKLOG, + FLUSHING_BACKLOG + } + + public SortingBacklogDataInput( + StreamTaskInput<T> wrappedInput, + TypeSerializer<T> typeSerializer, + TypeSerializer<K> keySerializer, + KeySelector<T, K> keySelector, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration taskManagerConfiguration, + TaskInvokable containingTask, + ExecutionConfig executionConfig, + StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) { + try { + this.canEmitBatchOfRecords = canEmitBatchOfRecords; + this.sortingDataOutput = new SortingDataOutput(); + this.keySelector = keySelector; + this.keySerializer = keySerializer; + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<T>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + KeyAndValueSerializer<T> keyAndValueSerializer = + new KeyAndValueSerializer<>(typeSerializer, keyLength); + this.wrappedInput = wrappedInput; + this.sorter = + ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator, + executionConfig) + .memoryFraction(managedMemoryFraction) + .enableSpilling( + ioManager, + taskManagerConfiguration.get( + AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles( + taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)) + .objectReuse(objectReuse) + .largeRecords( + taskManagerConfiguration.get( + AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getInputIndex() { + return wrappedInput.getInputIndex(); + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException { + if (mode != OperatingMode.PROCESSING_REALTIME) { + throw new UnsupportedOperationException( + "Checkpoints are not supported during backlog."); + } + return wrappedInput.prepareSnapshot(channelStateWriter, checkpointId); + } + + @Override + public void close() throws IOException { + IOException ex = null; + try { + wrappedInput.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + try { + sorter.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + if (ex != null) { + throw ex; + } + } + + @Override + public DataInputStatus emitNext(DataOutput<T> output) throws Exception { + LOG.debug("Emit next, current mode: {}", mode); + if (sortingDataOutput.innerOutput != output) { + sortingDataOutput.innerOutput = output; + } + + if (mode == OperatingMode.PROCESSING_REALTIME) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + while (true) { + final DataInputStatus status = emitNextSortedRecord(output); + if (status == DataInputStatus.MORE_AVAILABLE + && canEmitBatchOfRecords.check() + && mode == OperatingMode.FLUSHING_BACKLOG) { + continue; + } + return status; + } + } + + // Should never reach here + throw new RuntimeException(String.format("Unknown OperatingMode %s", mode)); + } + + @Nonnull + private DataInputStatus emitNextSortedRecord(DataOutput<T> output) throws Exception { + Tuple2<byte[], StreamRecord<T>> next = sortedInput.next(); + if (next != null) { + output.emitRecord(next.f1); + } else { + // Finished flushing + mode = OperatingMode.PROCESSING_REALTIME; + + // Send backlog=false downstream + output.emitRecordAttributes( + new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build()); + + if (watermarkSeenDuringBacklog > Long.MIN_VALUE) { + output.emitWatermark(new Watermark(watermarkSeenDuringBacklog)); + } + } + return DataInputStatus.MORE_AVAILABLE; + } + + @Override + public CompletableFuture<?> getAvailableFuture() { + if (mode == OperatingMode.FLUSHING_BACKLOG) { + return AvailabilityProvider.AVAILABLE; + } else { + return wrappedInput.getAvailableFuture(); + } + } + + private class SortingDataOutput implements DataOutput<T> { + + private DataOutput<T> innerOutput; + + @Override + public void emitRecord(StreamRecord<T> streamRecord) throws Exception { + LOG.debug("Emit record {}", streamRecord.getValue()); + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitRecord(streamRecord); + return; + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + K key = keySelector.getKey(streamRecord.getValue()); + + keySerializer.serialize(key, dataOutputSerializer); + byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer(); + dataOutputSerializer.clear(); + + sorter.writeRecord(Tuple2.of(serializedKey, streamRecord)); + return; + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + throw new RuntimeException("Unexpected StreamRecord during FLUSHING_BACKLOG."); + } + } + + @Override + public void emitWatermark(Watermark watermark) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitWatermark(watermark); + } else { + watermarkSeenDuringBacklog = + Math.max(watermarkSeenDuringBacklog, watermark.getTimestamp()); + } + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitWatermarkStatus(watermarkStatus); + } + + // Ignore watermark status during backlog + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitLatencyMarker(latencyMarker); + } + + // Ignore watermark status during backlog Review Comment: nit: ignore latency marker. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingBacklogDataInput.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * SortingBacklogDataInput forwards the input records to output util it switch to backlog + * processing. During backlog processing, it buffers and sorts the input records and outputs the + * sorted records to the downstream when switching to non backlog processing. + */ +@Internal +public class SortingBacklogDataInput<T, K> implements StreamTaskInput<T> { + + private static final Logger LOG = LoggerFactory.getLogger(SortingBacklogDataInput.class); + + private final StreamTaskInput<T> wrappedInput; + private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter; + private final KeySelector<T, K> keySelector; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingDataOutput sortingDataOutput; + private final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null; + private long watermarkSeenDuringBacklog = Long.MIN_VALUE; + + private volatile OperatingMode mode = OperatingMode.PROCESSING_REALTIME; + + private enum OperatingMode { + PROCESSING_REALTIME, + SORTING_BACKLOG, + FLUSHING_BACKLOG + } + + public SortingBacklogDataInput( + StreamTaskInput<T> wrappedInput, + TypeSerializer<T> typeSerializer, + TypeSerializer<K> keySerializer, + KeySelector<T, K> keySelector, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration taskManagerConfiguration, + TaskInvokable containingTask, + ExecutionConfig executionConfig, + StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) { + try { + this.canEmitBatchOfRecords = canEmitBatchOfRecords; + this.sortingDataOutput = new SortingDataOutput(); + this.keySelector = keySelector; + this.keySerializer = keySerializer; + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<T>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + KeyAndValueSerializer<T> keyAndValueSerializer = + new KeyAndValueSerializer<>(typeSerializer, keyLength); + this.wrappedInput = wrappedInput; + this.sorter = + ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator, + executionConfig) + .memoryFraction(managedMemoryFraction) + .enableSpilling( + ioManager, + taskManagerConfiguration.get( + AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles( + taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)) + .objectReuse(objectReuse) + .largeRecords( + taskManagerConfiguration.get( + AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getInputIndex() { + return wrappedInput.getInputIndex(); + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException { + if (mode != OperatingMode.PROCESSING_REALTIME) { + throw new UnsupportedOperationException( + "Checkpoints are not supported during backlog."); + } + return wrappedInput.prepareSnapshot(channelStateWriter, checkpointId); + } + + @Override + public void close() throws IOException { + IOException ex = null; + try { + wrappedInput.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + try { + sorter.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + if (ex != null) { + throw ex; + } + } + + @Override + public DataInputStatus emitNext(DataOutput<T> output) throws Exception { + LOG.debug("Emit next, current mode: {}", mode); + if (sortingDataOutput.innerOutput != output) { + sortingDataOutput.innerOutput = output; + } + + if (mode == OperatingMode.PROCESSING_REALTIME) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + while (true) { + final DataInputStatus status = emitNextSortedRecord(output); + if (status == DataInputStatus.MORE_AVAILABLE + && canEmitBatchOfRecords.check() + && mode == OperatingMode.FLUSHING_BACKLOG) { + continue; + } + return status; + } + } + + // Should never reach here + throw new RuntimeException(String.format("Unknown OperatingMode %s", mode)); + } + + @Nonnull + private DataInputStatus emitNextSortedRecord(DataOutput<T> output) throws Exception { + Tuple2<byte[], StreamRecord<T>> next = sortedInput.next(); + if (next != null) { + output.emitRecord(next.f1); + } else { + // Finished flushing + mode = OperatingMode.PROCESSING_REALTIME; + + // Send backlog=false downstream + output.emitRecordAttributes( + new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build()); + + if (watermarkSeenDuringBacklog > Long.MIN_VALUE) { + output.emitWatermark(new Watermark(watermarkSeenDuringBacklog)); + } + } + return DataInputStatus.MORE_AVAILABLE; + } + + @Override + public CompletableFuture<?> getAvailableFuture() { + if (mode == OperatingMode.FLUSHING_BACKLOG) { + return AvailabilityProvider.AVAILABLE; + } else { + return wrappedInput.getAvailableFuture(); + } + } + + private class SortingDataOutput implements DataOutput<T> { + + private DataOutput<T> innerOutput; + + @Override + public void emitRecord(StreamRecord<T> streamRecord) throws Exception { + LOG.debug("Emit record {}", streamRecord.getValue()); + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitRecord(streamRecord); + return; + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + K key = keySelector.getKey(streamRecord.getValue()); + + keySerializer.serialize(key, dataOutputSerializer); + byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer(); + dataOutputSerializer.clear(); + + sorter.writeRecord(Tuple2.of(serializedKey, streamRecord)); + return; + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + throw new RuntimeException("Unexpected StreamRecord during FLUSHING_BACKLOG."); + } Review Comment: It might be better to throw exception here regardless of OperatingMode value. This helps to handle cases when a new OperatingMode is introduced. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/StreamExecutionUtils.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.translators; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.TransformationTranslator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG; + +/** A utility class for applying sorting inputs during backlog processing. */ +public class StreamExecutionUtils { Review Comment: The name of this class is more general than the JavaDoc description. It might be better to modify the JavaDoc to avoid mentioning backlog. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java: ########## @@ -415,6 +417,12 @@ private void configureStreamGraphStreaming(final StreamGraph graph) { graph.setCheckpointStorage(checkpointStorage); graph.setSavepointDirectory(savepointDir); graph.setGlobalStreamExchangeMode(deriveGlobalStreamExchangeModeStreaming()); + + if (Duration.ZERO.equals( + configuration.get( + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG))) { + graph.setTimerServiceProvider(InternalBacklogAwareTimerServiceManagerImpl::create); + } Review Comment: Why do we need to add a timer service for backlog processing here, while before this PR there is no timer service needed in this method? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceImplTest.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createBacklogTimerService; +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createInternalTimerService; +import static org.apache.flink.streaming.api.operators.TimeServiceTestUtils.createTimerQueue; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link InternalBacklogAwareTimerServiceImpl}. */ +class InternalBacklogAwareTimerServiceImplTest { + + @Test + void testTriggerEventTimeTimer() throws Exception { + KeyGroupRange testKeyGroupRange = new KeyGroupRange(0, 1); + List<Tuple2<Integer, Long>> timers = new ArrayList<>(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + final HeapPriorityQueueSetFactory priorityQueueSetFactory = + new HeapPriorityQueueSetFactory(testKeyGroupRange, 1, 128); + TimerSerializer<Integer, String> timerSerializer = + new TimerSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE); + final TestTriggerable<Integer, String> triggerable = + TestTriggerable.eventTimeTrigger( + (timer) -> timers.add(Tuple2.of(timer.getKey(), timer.getTimestamp()))); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, String>> + eventTimersQueue = + createTimerQueue( + "eventTimersQueue", timerSerializer, priorityQueueSetFactory); + final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, String>> + processingTimerQueue = + createTimerQueue( + "processingTimerQueue", timerSerializer, priorityQueueSetFactory); + final BacklogTimeService<Integer, String> backlogTimeService = + createBacklogTimerService( + processingTimeService, triggerable, processingTimerQueue, eventTimersQueue); + final TestKeyContext keyContext = new TestKeyContext(); + final InternalTimerServiceImpl<Integer, String> internalTimerService = + createInternalTimerService( + testKeyGroupRange, + keyContext, + processingTimeService, + processingTimerQueue, + eventTimersQueue); + internalTimerService.startTimerService( + IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable); + + final InternalBacklogAwareTimerServiceImpl<Integer, String> timerService = + new InternalBacklogAwareTimerServiceImpl<>( + internalTimerService, backlogTimeService); + + keyContext.setCurrentKey(1); + timerService.registerEventTimeTimer("a", 2); + timerService.registerEventTimeTimer("a", 1); + timerService.registerEventTimeTimer("a", 3); + keyContext.setCurrentKey(2); + timerService.registerEventTimeTimer("a", 3); + timerService.registerEventTimeTimer("a", 1); + timerService.advanceWatermark(2); + assertThat(timers).containsExactly(Tuple2.of(1, 1L), Tuple2.of(2, 1L), Tuple2.of(1, 2L)); + timers.clear(); + + // switch to backlog processing + timerService.setBacklog(true); + timerService.setMaxWatermarkDuringBacklog(5); Review Comment: `AbstractStreamOperator.processRecordAttributes` would set watermark before set backlog, while the order in this test is reversed. It might be better to keep the same order as in production code. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingBacklogDataInput.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.DataInputStatus; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.InternalRecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * SortingBacklogDataInput forwards the input records to output util it switch to backlog + * processing. During backlog processing, it buffers and sorts the input records and outputs the + * sorted records to the downstream when switching to non backlog processing. + */ +@Internal +public class SortingBacklogDataInput<T, K> implements StreamTaskInput<T> { + + private static final Logger LOG = LoggerFactory.getLogger(SortingBacklogDataInput.class); + + private final StreamTaskInput<T> wrappedInput; + private final PushSorter<Tuple2<byte[], StreamRecord<T>>> sorter; + private final KeySelector<T, K> keySelector; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingDataOutput sortingDataOutput; + private final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<T>>> sortedInput = null; + private long watermarkSeenDuringBacklog = Long.MIN_VALUE; + + private volatile OperatingMode mode = OperatingMode.PROCESSING_REALTIME; + + private enum OperatingMode { + PROCESSING_REALTIME, + SORTING_BACKLOG, + FLUSHING_BACKLOG + } + + public SortingBacklogDataInput( + StreamTaskInput<T> wrappedInput, + TypeSerializer<T> typeSerializer, + TypeSerializer<K> keySerializer, + KeySelector<T, K> keySelector, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration taskManagerConfiguration, + TaskInvokable containingTask, + ExecutionConfig executionConfig, + StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) { + try { + this.canEmitBatchOfRecords = canEmitBatchOfRecords; + this.sortingDataOutput = new SortingDataOutput(); + this.keySelector = keySelector; + this.keySerializer = keySerializer; + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<T>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + KeyAndValueSerializer<T> keyAndValueSerializer = + new KeyAndValueSerializer<>(typeSerializer, keyLength); + this.wrappedInput = wrappedInput; + this.sorter = + ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator, + executionConfig) + .memoryFraction(managedMemoryFraction) + .enableSpilling( + ioManager, + taskManagerConfiguration.get( + AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles( + taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)) + .objectReuse(objectReuse) + .largeRecords( + taskManagerConfiguration.get( + AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getInputIndex() { + return wrappedInput.getInputIndex(); + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException { + if (mode != OperatingMode.PROCESSING_REALTIME) { + throw new UnsupportedOperationException( + "Checkpoints are not supported during backlog."); + } + return wrappedInput.prepareSnapshot(channelStateWriter, checkpointId); + } + + @Override + public void close() throws IOException { + IOException ex = null; + try { + wrappedInput.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + try { + sorter.close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + if (ex != null) { + throw ex; + } + } + + @Override + public DataInputStatus emitNext(DataOutput<T> output) throws Exception { + LOG.debug("Emit next, current mode: {}", mode); + if (sortingDataOutput.innerOutput != output) { + sortingDataOutput.innerOutput = output; + } + + if (mode == OperatingMode.PROCESSING_REALTIME) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + return wrappedInput.emitNext(sortingDataOutput); + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + while (true) { + final DataInputStatus status = emitNextSortedRecord(output); + if (status == DataInputStatus.MORE_AVAILABLE + && canEmitBatchOfRecords.check() + && mode == OperatingMode.FLUSHING_BACKLOG) { + continue; + } + return status; + } + } + + // Should never reach here + throw new RuntimeException(String.format("Unknown OperatingMode %s", mode)); + } + + @Nonnull + private DataInputStatus emitNextSortedRecord(DataOutput<T> output) throws Exception { + Tuple2<byte[], StreamRecord<T>> next = sortedInput.next(); + if (next != null) { + output.emitRecord(next.f1); + } else { + // Finished flushing + mode = OperatingMode.PROCESSING_REALTIME; + + // Send backlog=false downstream + output.emitRecordAttributes( + new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build()); + + if (watermarkSeenDuringBacklog > Long.MIN_VALUE) { + output.emitWatermark(new Watermark(watermarkSeenDuringBacklog)); + } + } + return DataInputStatus.MORE_AVAILABLE; + } + + @Override + public CompletableFuture<?> getAvailableFuture() { + if (mode == OperatingMode.FLUSHING_BACKLOG) { + return AvailabilityProvider.AVAILABLE; + } else { + return wrappedInput.getAvailableFuture(); + } + } + + private class SortingDataOutput implements DataOutput<T> { + + private DataOutput<T> innerOutput; + + @Override + public void emitRecord(StreamRecord<T> streamRecord) throws Exception { + LOG.debug("Emit record {}", streamRecord.getValue()); + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitRecord(streamRecord); + return; + } + + if (mode == OperatingMode.SORTING_BACKLOG) { + K key = keySelector.getKey(streamRecord.getValue()); + + keySerializer.serialize(key, dataOutputSerializer); + byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer(); + dataOutputSerializer.clear(); + + sorter.writeRecord(Tuple2.of(serializedKey, streamRecord)); + return; + } + + if (mode == OperatingMode.FLUSHING_BACKLOG) { + throw new RuntimeException("Unexpected StreamRecord during FLUSHING_BACKLOG."); + } + } + + @Override + public void emitWatermark(Watermark watermark) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitWatermark(watermark); + } else { + watermarkSeenDuringBacklog = + Math.max(watermarkSeenDuringBacklog, watermark.getTimestamp()); + } + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitWatermarkStatus(watermarkStatus); + } + + // Ignore watermark status during backlog + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception { + if (mode == OperatingMode.PROCESSING_REALTIME) { + innerOutput.emitLatencyMarker(latencyMarker); + } + + // Ignore watermark status during backlog + } + + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception { + LOG.debug("Emit record attributes {}", recordAttributes); + if (mode == OperatingMode.PROCESSING_REALTIME && recordAttributes.isBacklog()) { + // switch to backlog + mode = OperatingMode.SORTING_BACKLOG; + innerOutput.emitRecordAttributes(recordAttributes); + return; + } + + if (mode == OperatingMode.SORTING_BACKLOG && !recordAttributes.isBacklog()) { + innerOutput.emitRecordAttributes( + new InternalRecordAttributes(true, watermarkSeenDuringBacklog)); + sorter.finishReading(); + sortedInput = sorter.getIterator(); + mode = OperatingMode.FLUSHING_BACKLOG; + return; + } + + if (mode == OperatingMode.FLUSHING_BACKLOG && recordAttributes.isBacklog()) { + throw new RuntimeException( + "Should not receive record attribute while flushing backlog."); Review Comment: The ` && recordAttributes.isBacklog()` in this if condition should be removed according to the error message. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordAttributesValveTest.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordAttributesValve}. */ +class RecordAttributesValveTest { Review Comment: nit: public class. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestTriggerable.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.assertj.core.api.Assertions; + +import java.util.function.Consumer; + +/** Triggerable for test. */ +class TestTriggerable<K, N> implements Triggerable<K, N> { + + private final Consumer<InternalTimer<K, N>> eventTimeHandler; + private final Consumer<InternalTimer<K, N>> processingTimeHandler; + + public static <K, N> TestTriggerable<K, N> eventTimeTrigger( + Consumer<InternalTimer<K, N>> eventTimeHandler) { + return new TestTriggerable<>( + eventTimeHandler, + timer -> Assertions.fail("We did not expect processing timer to be triggered.")); + } + + public static <K, N> TestTriggerable<K, N> processingTimeTrigger( Review Comment: This method can be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
