[
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544879#comment-16544879
]
ASF GitHub Bot commented on FLINK-9489:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6333#discussion_r202589096
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
---
@@ -80,13 +82,57 @@ public E peek() {
@Override
public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull
Consumer<E> consumer) {
+ if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+ bulkPollRelaxedOrder(canConsume, consumer);
+ } else {
+ bulkPollStrictOrder(canConsume, consumer);
+ }
+ }
+
+ private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume,
@Nonnull Consumer<E> consumer) {
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ } else {
+ while (!orderedCache.isEmpty() &&
canConsume.test(orderedCache.peekFirst())) {
+ final E next = orderedCache.removeFirst();
+ orderedStore.remove(next);
+ consumer.accept(next);
+ }
+
+ if (orderedCache.isEmpty()) {
+ bulkPollStore(canConsume, consumer);
+ }
+ }
+ }
+
+ private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume,
@Nonnull Consumer<E> consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
consumer.accept(element);
}
}
+ private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull
Consumer<E> consumer) {
+ try (CloseableIterator<E> iterator =
orderedStore.orderedIterator()) {
+ while (iterator.hasNext()) {
+ final E next = iterator.next();
+ if (canConsume.test(next)) {
+ orderedStore.remove(next);
+ consumer.accept(next);
+ } else {
+ orderedCache.add(next);
+ while (iterator.hasNext() &&
!orderedCache.isFull()) {
+
orderedCache.add(iterator.next());
+ }
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Exception while bulk
polling store.", e);
--- End diff --
Because it makes it more explicit that there are things which can go wrong.
With checked exceptions you still have the chance to let the program fail. But
without them, the caller needs to know that there are unchecked exception in
order to do any recovery operation.
Moreover, I'm not sure whether we should manifest on this level how
recovery is done or not done. For example, maybe the caller can fetch the
latest checkpoint data again and replay all in-between elements in order to
recompute the state. This is something which the priority queue should not need
to bother about.
> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---------------------------------------------------------------------------
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e.,
> stored inside the managed keyed state. This means that we have to connect our
> preparation for asynchronous checkpoints with the backend, so that the timers
> are written as part of the state for each key-group. This means that we will
> also free up the raw keyed state an might expose it to user functions in the
> future.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)