Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6062#discussion_r190295347
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
    @@ -0,0 +1,504 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Queue;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
    +/**
    + * A heap-based priority queue for internal timers. This heap is supported 
by hash sets for fast contains
    + * (de-duplication) and deletes. The heap implementation is a simple 
binary tree stored inside an array. Element indexes
    + * in the heap array start at 1 instead of 0 to make array index 
computations a bit simpler in the hot methods.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap and the 
deduplication maps.</li>
    + *  <li>We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
    + * would be enough if it could return existing elements on unsuccessful 
adding, etc..</li>
    + * </ul>
    + *
    + * @param <K> type of the key of the internal timers managed by this 
priority queue.
    + * @param <N> type of the namespace of the internal timers managed by this 
priority queue.
    + */
    +public class InternalTimerHeap<K, N> implements 
Queue<TimerHeapInternalTimer<K, N>>, Set<TimerHeapInternalTimer<K, N>> {
    +
    +   /**
    +    * A safe maximum size for arrays in the JVM.
    +    */
    +   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    +
    +   /**
    +    * Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
    +    */
    +   private static final Comparator<TimerHeapInternalTimer<?, ?>> 
COMPARATOR =
    +           (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
    +
    +   /**
    +    * This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
    +    */
    +   private final HashMap<TimerHeapInternalTimer<K, N>, 
TimerHeapInternalTimer<K, N>>[] deduplicationMapsByKeyGroup;
    +
    +   /**
    +    * The array that represents the heap-organized priority queue.
    +    */
    +   private TimerHeapInternalTimer<K, N>[] queue;
    +
    +   /**
    +    * The current size of the priority queue.
    +    */
    +   private int size;
    +
    +   /**
    +    * The key-group range of timers that are managed by this queue.
    +    */
    +   private final KeyGroupRange keyGroupRange;
    +
    +   /**
    +    * The total number of key-groups of the job.
    +    */
    +   private final int totalNumberOfKeyGroups;
    +
    +
    +   /**
    +    * Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
    +    *
    +    * @param minimumCapacity the minimum and initial capacity of this 
priority queue.
    +    */
    +   @SuppressWarnings("unchecked")
    +   InternalTimerHeap(
    +           @Nonnegative int minimumCapacity,
    +           @Nonnull KeyGroupRange keyGroupRange,
    +           @Nonnegative int totalNumberOfKeyGroups) {
    +
    +           this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
    +           this.keyGroupRange = keyGroupRange;
    +
    +           final int keyGroupsInLocalRange = 
keyGroupRange.getNumberOfKeyGroups();
    +           final int deduplicationSetSize = 1 + minimumCapacity / 
keyGroupsInLocalRange;
    +           this.deduplicationMapsByKeyGroup = new 
HashMap[keyGroupsInLocalRange];
    +           for (int i = 0; i < keyGroupsInLocalRange; ++i) {
    +                   deduplicationMapsByKeyGroup[i] = new 
HashMap<>(deduplicationSetSize);
    +           }
    +
    +           this.queue = new TimerHeapInternalTimer[1 + minimumCapacity];
    +   }
    +
    +   /**
    +    * @see Set#add(Object)
    +    */
    +   @Override
    +   public boolean add(@Nonnull TimerHeapInternalTimer<K, N> timer) {
    +
    +           if (getDedupMapForKeyGroup(timer).putIfAbsent(timer, timer) == 
null) {
    +                   final int newSize = ++this.size;
    +                   checkCapacity(newSize);
    +                   moveElementToIdx(timer, newSize);
    +                   siftUp(newSize);
    +                   return true;
    +           } else {
    +                   return false;
    +           }
    +   }
    +
    +   /**
    +    * This behaves like {@link #add(TimerHeapInternalTimer)}.
    +    */
    +   @Override
    +   public boolean offer(@Nonnull TimerHeapInternalTimer<K, N> k) {
    +           return add(k);
    +   }
    +
    +   @Nullable
    +   @Override
    +   public TimerHeapInternalTimer<K, N> poll() {
    +           return size() > 0 ? removeElementAtIndex(1) : null;
    +   }
    +
    +   @Nonnull
    +   @Override
    +   public TimerHeapInternalTimer<K, N> remove() {
    +           TimerHeapInternalTimer<K, N> pollResult = poll();
    +           if (pollResult != null) {
    +                   return pollResult;
    +           } else {
    +                   throw new 
NoSuchElementException("InternalTimerPriorityQueue is empty.");
    +           }
    +   }
    +
    +   @Nullable
    +   @Override
    +   public TimerHeapInternalTimer<K, N> peek() {
    +           return size() > 0 ? queue[1] : null;
    +   }
    +
    +   @Nonnull
    +   @Override
    +   public TimerHeapInternalTimer<K, N> element() {
    +           TimerHeapInternalTimer<K, N> peekResult = peek();
    +           if (peekResult != null) {
    +                   return peekResult;
    +           } else {
    +                   throw new 
NoSuchElementException("InternalTimerPriorityQueue is empty.");
    +           }
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +           return size() == 0;
    +   }
    +
    +   @Override
    +   public boolean contains(@Nullable Object o) {
    +           return (o instanceof TimerHeapInternalTimer)
    +                   && getDedupMapForKeyGroup((TimerHeapInternalTimer<?, 
?>) o).containsKey(o);
    +   }
    +
    +   @Override
    +   public boolean remove(@Nullable Object o) {
    +           if (o instanceof TimerHeapInternalTimer) {
    +                   return removeInternal((TimerHeapInternalTimer<?, ?>) o);
    +           }
    +           return false;
    +   }
    +
    +   @Override
    +   public boolean addAll(@Nullable Collection<? extends 
TimerHeapInternalTimer<K, N>> timers) {
    +
    +           if (timers == null) {
    +                   return true;
    +           }
    +
    +           if (timers.size() > queue.length) {
    --- End diff --
    
    I think maybe `if (time.size() + size() > queue.length)` is better, because 
we can ensure that we only need to rescale the array once with this approach. 
With the current version, we may need to rescale it multi times, what do you 
think?


---

Reply via email to