[ 
https://issues.apache.org/jira/browse/FLINK-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511453#comment-16511453
 ] 

ASF GitHub Bot commented on FLINK-9487:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195156251
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java
 ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Abstract class that contains the base algorithm for partitioning data 
into key-groups. This algorithm currently works
    + * with two array (input, output) for optimal algorithmic complexity. 
Notice that this could also be implemented over a
    + * single array, using some cuckoo-hashing-style element replacement. This 
would have worse algorithmic complexity but
    + * better space efficiency. We currently prefer the trade-off in favor of 
better algorithmic complexity.
    + */
    +public abstract class AbstractKeyGroupPartitioner<T> {
    +
    +   /** Total number of input elements. */
    +   @Nonnegative
    +   protected final int numberOfElements;
    +
    +   /** The total number of key-groups in the job. */
    +   @Nonnegative
    +   protected final int totalKeyGroups;
    +
    +   /** The key-group range for the input data, covered in this 
partitioning. */
    +   @Nonnull
    +   protected final KeyGroupRange keyGroupRange;
    +
    +   /**
    +    * This bookkeeping array is used to count the elements in each 
key-group. In a second step, it is transformed into
    +    * a histogram by accumulation.
    +    */
    +   @Nonnull
    +   protected final int[] counterHistogram;
    +
    +   /**
    +    * This is a helper array that caches the key-group for each element, 
so we do not have to compute them twice.
    +    */
    +   @Nonnull
    +   protected final int[] elementKeyGroups;
    +
    +   /** Cached value of keyGroupRange#firstKeyGroup. */
    +   @Nonnegative
    +   protected final int firstKeyGroup;
    +
    +   /** Cached result. */
    +   protected PartitioningResult<T> computedResult;
    +
    +   /**
    +    * @param keyGroupRange the key-group range of the data that will be 
partitioned by this instance.
    +    * @param totalKeyGroups the total number of key groups in the job.
    +    */
    +   public AbstractKeyGroupPartitioner(
    +           @Nonnegative int numberOfElements,
    +           @Nonnull KeyGroupRange keyGroupRange,
    +           @Nonnegative int totalKeyGroups) {
    +
    +           this.numberOfElements = numberOfElements;
    +           this.keyGroupRange = keyGroupRange;
    +           this.totalKeyGroups = totalKeyGroups;
    +           this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +           this.elementKeyGroups = new int[numberOfElements];
    +           this.counterHistogram = new 
int[keyGroupRange.getNumberOfKeyGroups()];
    +           this.computedResult = null;
    +   }
    +
    +   /**
    +    * Partitions the data into key-groups and returns the result via 
{@link PartitioningResult}.
    +    */
    +   public PartitioningResult<T> partitionByKeyGroup() {
    +           if (computedResult == null) {
    +                   reportAllElementKeyGroups();
    +                   buildHistogramFromCounts();
    +                   executePartitioning();
    +           }
    +           return computedResult;
    +   }
    +
    +   /**
    +    * This method iterates over the input data and reports the key-group 
for each element.
    +    */
    +   protected void reportAllElementKeyGroups() {
    +           final T[] input = getPartitioningInput();
    +
    +           Preconditions.checkState(input.length >= numberOfElements);
    +
    +           for (int i = 0; i < numberOfElements; ++i) {
    +                   int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), 
totalKeyGroups);
    +                   reportKeyGroupOfElementAtIndex(i, keyGroup);
    +           }
    +   }
    +
    +   /**
    +    * Returns the key for the given element by which the key-group can be 
computed.
    +    */
    +   @Nonnull
    +   protected abstract Object extractKeyFromElement(T element);
    +
    +   /**
    +    * Returns the input data for the partitioning. All elements to 
consider must be densely in the index interval
    +    * [0, {@link #numberOfElements}[, without null values.
    +    */
    +   @Nonnull
    +   protected abstract T[] getPartitioningInput();
    +
    +   /**
    +    * Returns the output array for the partitioning. The size must be 
{@link #numberOfElements} (or bigger).
    +    */
    +   @Nonnull
    +   protected abstract T[] getPartitioningOutput();
    +
    +   /**
    +    * This method reports in the bookkeeping data that the element at the 
given index belongs to the given key-group.
    +    */
    +   protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
    +           final int keyGroupIndex = keyGroup - firstKeyGroup;
    +           elementKeyGroups[index] = keyGroupIndex;
    +           ++counterHistogram[keyGroupIndex];
    +   }
    +
    +   /**
    +    * This method creates a histogram from the counts per key-group in 
{@link #counterHistogram}.
    +    */
    +   private void buildHistogramFromCounts() {
    +           int sum = 0;
    +           for (int i = 0; i < counterHistogram.length; ++i) {
    +                   int currentSlotValue = counterHistogram[i];
    +                   counterHistogram[i] = sum;
    +                   sum += currentSlotValue;
    +           }
    +   }
    +
    +   private void executePartitioning() {
    +
    +           final T[] in = getPartitioningInput();
    +           final T[] out = getPartitioningOutput();
    +
    +           Preconditions.checkState(in != out);
    +           Preconditions.checkState(in.length >= numberOfElements);
    +           Preconditions.checkState(out.length >= numberOfElements);
    --- End diff --
    
    This looks like a sanity check, but for `CopyOnWriteStateTable` there seems 
to be a loop hole: if the number of elements in backend is more than `1 << 30`, 
this check could be failed, which means that the current version does support 
to storage more than `1 << 30` records in heap based backend(but the previous 
version support it, it just log some warning message when the state's size is 
too large), is this intended?


> Prepare InternalTimerHeap for asynchronous snapshots
> ----------------------------------------------------
>
>                 Key: FLINK-9487
>                 URL: https://issues.apache.org/jira/browse/FLINK-9487
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing, Streaming
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.6.0
>
>
> When we want to snapshot timers with the keyed backend state, this must 
> happen as part of an asynchronous snapshot.
> The data structure {{InternalTimerHeap}} needs to offer support for this 
> through a lightweight copy mechanism (e.g. arraycopy of the timer queue, 
> because timers are immutable w.r.t. serialization).
> We can also stop keeping the dedup maps in {{InternalTimerHeap}} separated by 
> key-group, all timers can go into one map.
> Instead, we can implement online-partitioning as part of the asynchronous 
> operation, similar to what we do in {{CopyOnWriteStateTable}} snapshots. 
> Notice that in this intermediate state, the code will still run in the 
> synchronous part until we are integrated with the backends for async 
> snapshotting (next subtask of this jira).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to