[FLINK-9486][state] Introduce InternalPriorityQueue as state in keyed state backends
This commit does not include the integration with checkpointing. This closes #6276. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b38f8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b38f8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b38f8f Branch: refs/heads/master Commit: 79b38f8f9a79b917d525842cf46087c5b8c40f3d Parents: b12acea Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Wed Jul 4 13:43:49 2018 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Jul 9 16:12:51 2018 +0200 ---------------------------------------------------------------------- .../KVStateRequestSerializerRocksDBTest.java | 16 +- .../network/KvStateRequestSerializerTest.java | 19 +- .../runtime/state/InternalPriorityQueue.java | 12 + .../state/KeyGroupedInternalPriorityQueue.java | 38 ++++ .../flink/runtime/state/KeyedStateBackend.java | 3 +- .../flink/runtime/state/PriorityComparator.java | 42 ++++ .../runtime/state/PriorityQueueSetFactory.java | 46 ++++ .../state/TieBreakingPriorityComparator.java | 122 ++++++++++ .../state/filesystem/FsStateBackend.java | 6 +- .../heap/CachingInternalPriorityQueueSet.java | 26 ++- .../state/heap/HeapKeyedStateBackend.java | 28 ++- .../runtime/state/heap/HeapPriorityQueue.java | 35 ++- .../state/heap/HeapPriorityQueueSet.java | 46 ++-- .../state/heap/HeapPriorityQueueSetFactory.java | 69 ++++++ .../heap/KeyGroupPartitionedPriorityQueue.java | 63 ++++-- .../runtime/state/heap/TreeOrderedSetCache.java | 7 + .../state/memory/MemoryStateBackend.java | 7 +- .../state/InternalPriorityQueueTestBase.java | 12 +- .../state/StateSnapshotCompressionTest.java | 12 +- .../state/heap/HeapPriorityQueueSetTest.java | 2 +- .../state/heap/HeapPriorityQueueTest.java | 2 +- .../state/heap/HeapStateBackendTestBase.java | 10 +- .../KeyGroupPartitionedPriorityQueueTest.java | 2 +- .../streaming/state/RockDBBackendOptions.java | 38 ++++ .../state/RocksDBKeyedStateBackend.java | 171 +++++++++++++- .../streaming/state/RocksDBOrderedSetStore.java | 13 +- .../streaming/state/RocksDBStateBackend.java | 22 +- ...nalPriorityQueueSetWithRocksDBStoreTest.java | 1 - .../state/RocksDBOrderedSetStoreTest.java | 1 - .../state/RocksDBStateBackendTest.java | 3 +- .../api/operators/AbstractStreamOperator.java | 6 +- .../api/operators/HeapInternalTimerService.java | 85 +++---- .../operators/InternalTimeServiceManager.java | 80 ++++--- .../streaming/api/operators/InternalTimer.java | 22 ++ .../InternalTimerServiceSerializationProxy.java | 65 +++--- .../StreamTaskStateInitializerImpl.java | 1 + .../api/operators/TimerHeapInternalTimer.java | 23 -- .../api/operators/TimerSerializer.java | 222 +++++++++++++++++++ .../operators/HeapInternalTimerServiceTest.java | 138 ++++++++---- 39 files changed, 1234 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index a49fdd2..9ea3198 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -74,9 +75,12 @@ public final class KVStateRequestSerializerRocksDBTest { columnFamilyOptions, mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), - new ExecutionConfig(), false, - TestLocalRecoveryConfig.disabled() + 1, + new KeyGroupRange(0, 0), + new ExecutionConfig(), + false, + TestLocalRecoveryConfig.disabled(), + RocksDBStateBackend.PriorityQueueStateType.HEAP ); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); @@ -112,10 +116,12 @@ public final class KVStateRequestSerializerRocksDBTest { columnFamilyOptions, mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), + 1, + new KeyGroupRange(0, 0), new ExecutionConfig(), false, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + RocksDBStateBackend.PriorityQueueStateType.HEAP); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index 2ba7507..73f8831 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; @@ -185,18 +186,19 @@ public class KvStateRequestSerializerTest { @Test public void testListSerialization() throws Exception { final long key = 0L; - + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0); // objects for heap state list serialisation final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = new HeapKeyedStateBackend<>( mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), - 1, - new KeyGroupRange(0, 0), + keyGroupRange.getNumberOfKeyGroups(), + keyGroupRange, async, new ExecutionConfig(), - TestLocalRecoveryConfig.disabled() + TestLocalRecoveryConfig.disabled(), + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) ); longHeapKeyedStateBackend.setCurrentKey(key); @@ -292,18 +294,19 @@ public class KvStateRequestSerializerTest { @Test public void testMapSerialization() throws Exception { final long key = 0L; - + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0); // objects for heap state list serialisation final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = new HeapKeyedStateBackend<>( mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), - 1, - new KeyGroupRange(0, 0), + keyGroupRange.getNumberOfKeyGroups(), + keyGroupRange, async, new ExecutionConfig(), - TestLocalRecoveryConfig.disabled() + TestLocalRecoveryConfig.disabled(), + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) ); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java index fb3ee82..dc46c8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java @@ -26,6 +26,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * Interface for collection that gives in order access to elements w.r.t their priority. @@ -36,6 +38,16 @@ import java.util.Collection; public interface InternalPriorityQueue<T> { /** + * Polls from the top of the queue as long as the the queue is not empty and passes the elements to + * {@link Consumer} until a {@link Predicate} rejects an offered element. The rejected element is not + * removed from the queue and becomes the new head. + * + * @param canConsume bulk polling ends once this returns false. The rejected element is nor removed and not consumed. + * @param consumer consumer function for elements accepted by canConsume. + */ + void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer); + + /** * Retrieves and removes the first element (w.r.t. the order) of this set, * or returns {@code null} if this set is empty. * http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java new file mode 100644 index 0000000..68472e2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +import java.util.Set; + +/** + * This interface exists as (temporary) adapter between the new {@link InternalPriorityQueue} and the old way in which + * timers are written in a snapshot. This interface can probably go away once timer state becomes part of the + * keyed state backend snapshot. + */ +public interface KeyGroupedInternalPriorityQueue<T> extends InternalPriorityQueue<T> { + + /** + * Returns the subset of elements in the priority queue that belongs to the given key-group, within the operator's + * key-group range. + */ + @Nonnull + Set<T> getSubsetForKeyGroup(int keyGroupId); +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index ad75a1f..7ba14b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -31,7 +31,8 @@ import java.util.stream.Stream; * * @param <K> The key by which state is keyed. */ -public interface KeyedStateBackend<K> extends InternalKeyContext<K>, KeyedStateFactory, Disposable { +public interface KeyedStateBackend<K> + extends InternalKeyContext<K>, KeyedStateFactory, PriorityQueueSetFactory, Disposable { /** * Sets the current key that is used for partitioned state. http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java new file mode 100644 index 0000000..2f6f5a7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * This interface works similar to {@link Comparable} and is used to prioritize between two objects. The main difference + * between this interface and {@link Comparable} is it is not require to follow the usual contract between that + * {@link Comparable#compareTo(Object)} and {@link Object#equals(Object)}. The contract of this interface is: + * When two objects are equal, they indicate the same priority, but indicating the same priority does not require that + * both objects are equal. + * + * @param <T> type of the compared objects. + */ +@FunctionalInterface +public interface PriorityComparator<T> { + + /** + * Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first + * argument has lower, equal to, or higher priority than the second. + * @param left left operand in the comparison by priority. + * @param right left operand in the comparison by priority. + * @return a negative integer, zero, or a positive integer as the first argument has lower, equal to, or higher + * priority than the second. + */ + int comparePriority(T left, T right); +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java new file mode 100644 index 0000000..6f509c0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; + +import javax.annotation.Nonnull; + +/** + * Factory for {@link KeyGroupedInternalPriorityQueue} instances. + */ +public interface PriorityQueueSetFactory { + + /** + * Creates a {@link KeyGroupedInternalPriorityQueue}. + * + * @param stateName unique name for associated with this queue. + * @param byteOrderedElementSerializer a serializer that with a format that is lexicographically ordered in + * alignment with elementPriorityComparator. + * @param <T> type of the stored elements. + * @return the queue with the specified unique name. + */ + @Nonnull + <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create( + @Nonnull String stateName, + @Nonnull TypeSerializer<T> byteOrderedElementSerializer, + @Nonnull PriorityComparator<T> elementPriorityComparator, + @Nonnull KeyExtractorFunction<T> keyExtractor); +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java new file mode 100644 index 0000000..4384eb7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Comparator; + +/** + * This class is an adapter between {@link PriorityComparator} and a full {@link Comparator} that respects the + * contract between {@link Comparator#compare(Object, Object)} and {@link Object#equals(Object)}. This is currently + * needed for implementations of + * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} that are implemented + * on top of a data structure that relies on the this contract, e.g. a tree set. We should replace this in the near + * future. + * + * @param <T> type of the compared elements. + */ +public class TieBreakingPriorityComparator<T> implements Comparator<T>, PriorityComparator<T> { + + /** The {@link PriorityComparator} to which we delegate in a first step. */ + @Nonnull + private final PriorityComparator<T> priorityComparator; + + /** Serializer for instances of the compared objects. */ + @Nonnull + private final TypeSerializer<T> serializer; + + /** Stream that we use in serialization. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outStream; + + /** {@link org.apache.flink.core.memory.DataOutputView} around outStream. */ + @Nonnull + private final DataOutputViewStreamWrapper outView; + + public TieBreakingPriorityComparator( + @Nonnull PriorityComparator<T> priorityComparator, + @Nonnull TypeSerializer<T> serializer, + @Nonnull ByteArrayOutputStreamWithPos outStream, + @Nonnull DataOutputViewStreamWrapper outView) { + + this.priorityComparator = priorityComparator; + this.serializer = serializer; + this.outStream = outStream; + this.outView = outView; + } + + @SuppressWarnings("unchecked") + @Override + public int compare(T o1, T o2) { + + // first we compare priority, this should be the most commonly hit case + int cmp = priorityComparator.comparePriority(o1, o2); + + if (cmp != 0) { + return cmp; + } + + // here we start tie breaking and do our best to comply with the compareTo/equals contract, first we try + // to simply find an existing way to fully compare. + if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { + return ((Comparable<T>) o1).compareTo(o2); + } + + // we catch this case before moving to more expensive tie breaks. + if (o1.equals(o2)) { + return 0; + } + + // if objects are not equal, their serialized form should somehow differ as well. this can be costly, and... + // TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality. + try { + outStream.reset(); + serializer.serialize(o1, outView); + int leftLen = outStream.getPosition(); + serializer.serialize(o2, outView); + int rightLen = outStream.getPosition() - leftLen; + return compareBytes(outStream.getBuf(), 0, leftLen, leftLen, rightLen); + } catch (IOException ex) { + throw new FlinkRuntimeException("Serializer problem in comparator.", ex); + } + } + + @Override + public int comparePriority(T left, T right) { + return priorityComparator.comparePriority(left, right); + } + + public static int compareBytes(byte[] bytes, int offLeft, int leftLen, int offRight, int rightLen) { + int maxLen = Math.min(leftLen, rightLen); + for (int i = 0; i < maxLen; ++i) { + int cmp = bytes[offLeft + i] - bytes[offRight + i]; + if (cmp != 0) { + return cmp; + } + } + return leftLen - rightLen; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 637effd..ad1581b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.util.TernaryBoolean; import org.slf4j.LoggerFactory; @@ -457,6 +458,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur TaskStateManager taskStateManager = env.getTaskStateManager(); LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); + HeapPriorityQueueSetFactory priorityQueueSetFactory = + new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); return new HeapKeyedStateBackend<>( kvStateRegistry, @@ -466,7 +469,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur keyGroupRange, isUsingAsynchronousSnapshots(), env.getExecutionConfig(), - localRecoveryConfig); + localRecoveryConfig, + priorityQueueSetFactory); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java index 771315d..6dc8cf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java @@ -27,6 +27,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of @@ -76,6 +78,15 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue return orderedCache.peekFirst(); } + @Override + public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) { + E element; + while ((element = peek()) != null && canConsume.test(element)) { + poll(); + consumer.accept(element); + } + } + @Nullable @Override public E poll() { @@ -158,7 +169,11 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue @Nonnull @Override public CloseableIterator<E> iterator() { - return orderedStore.orderedIterator(); + if (storeOnlyElements) { + return orderedStore.orderedIterator(); + } else { + return orderedCache.orderedIterator(); + } } @Override @@ -184,7 +199,7 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue } storeOnlyElements = iterator.hasNext(); } catch (Exception e) { - throw new FlinkRuntimeException("Exception while closing RocksDB iterator.", e); + throw new FlinkRuntimeException("Exception while refilling store from iterator.", e); } } } @@ -249,6 +264,13 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue */ @Nullable E peekLast(); + + /** + * Returns an iterator over the store that returns element in order. The iterator must be closed by the client + * after usage. + */ + @Nonnull + CloseableIterator<E> orderedIterator(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 82ce584..b5b2626 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -44,13 +44,17 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateFunction; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.PriorityComparator; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.SnapshotResult; @@ -102,6 +106,21 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create) ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + @Nonnull + @Override + public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create( + @Nonnull String stateName, + @Nonnull TypeSerializer<T> byteOrderedElementSerializer, + @Nonnull PriorityComparator<T> elementPriorityComparator, + @Nonnull KeyExtractorFunction<T> keyExtractor) { + + return priorityQueueSetFactory.create( + stateName, + byteOrderedElementSerializer, + elementPriorityComparator, + keyExtractor); + } + private interface StateFactory { <K, N, SV, S extends State, IS extends S> IS createState( StateDescriptor<S, SV> stateDesc, @@ -137,6 +156,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ private final HeapSnapshotStrategy snapshotStrategy; + /** + * Factory for state that is organized as priority queue. + */ + private final PriorityQueueSetFactory priorityQueueSetFactory; + public HeapKeyedStateBackend( TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, @@ -145,7 +169,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyGroupRange keyGroupRange, boolean asynchronousSnapshots, ExecutionConfig executionConfig, - LocalRecoveryConfig localRecoveryConfig) { + LocalRecoveryConfig localRecoveryConfig, + PriorityQueueSetFactory priorityQueueSetFactory) { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); @@ -157,6 +182,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait); LOG.info("Initializing heap keyed state backend with stream factory."); this.restoredKvStateMetaInfos = new HashMap<>(); + this.priorityQueueSetFactory = priorityQueueSetFactory; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java index 7017905..e5f610e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.runtime.state.PriorityComparator; import org.apache.flink.util.CloseableIterator; import javax.annotation.Nonnegative; @@ -27,9 +28,10 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.function.Consumer; +import java.util.function.Predicate; import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; @@ -56,9 +58,9 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In private static final int QUEUE_HEAD_INDEX = 1; /** - * Comparator for the contained elements. + * Comparator for the priority of contained elements. */ - private final Comparator<T> elementComparator; + private final PriorityComparator<T> elementPriorityComparator; /** * The array that represents the heap-organized priority queue. @@ -73,19 +75,28 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In /** * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity. * - * @param elementComparator comparator for the contained elements. + * @param elementPriorityComparator comparator for the priority of contained elements. * @param minimumCapacity the minimum and initial capacity of this priority queue. */ @SuppressWarnings("unchecked") public HeapPriorityQueue( - @Nonnull Comparator<T> elementComparator, + @Nonnull PriorityComparator<T> elementPriorityComparator, @Nonnegative int minimumCapacity) { - this.elementComparator = elementComparator; + this.elementPriorityComparator = elementPriorityComparator; this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity]; } @Override + public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) { + T element; + while ((element = peek()) != null && canConsume.test(element)) { + poll(); + consumer.accept(element); + } + } + + @Override @Nullable public T poll() { return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null; @@ -227,7 +238,7 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In final T currentElement = heap[idx]; int parentIdx = idx >>> 1; - while (parentIdx > 0 && isElementLessThen(currentElement, heap[parentIdx])) { + while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) { moveElementToIdx(heap[parentIdx], idx); idx = parentIdx; parentIdx >>>= 1; @@ -245,19 +256,19 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In int secondChildIdx = firstChildIdx + 1; if (isElementIndexValid(secondChildIdx, heapSize) && - isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) { firstChildIdx = secondChildIdx; } while (isElementIndexValid(firstChildIdx, heapSize) && - isElementLessThen(heap[firstChildIdx], currentElement)) { + isElementPriorityLessThen(heap[firstChildIdx], currentElement)) { moveElementToIdx(heap[firstChildIdx], idx); idx = firstChildIdx; firstChildIdx = idx << 1; secondChildIdx = firstChildIdx + 1; if (isElementIndexValid(secondChildIdx, heapSize) && - isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) { firstChildIdx = secondChildIdx; } } @@ -269,8 +280,8 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In return elementIndex <= heapSize; } - private boolean isElementLessThen(T a, T b) { - return elementComparator.compare(a, b) < 0; + private boolean isElementPriorityLessThen(T a, T b) { + return elementPriorityComparator.comparePriority(a, b) < 0; } private void moveElementToIdx(T element, int idx) { http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java index 61313e9..79f319c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java @@ -18,20 +18,17 @@ package org.apache.flink.runtime.state.heap; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.PriorityComparator; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; @@ -49,7 +46,9 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * @param <T> type of the contained elements. */ -public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> { +public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> + extends HeapPriorityQueue<T> + implements KeyGroupedInternalPriorityQueue<T> { /** * Function to extract the key from contained elements. @@ -74,7 +73,7 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He /** * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity. * - * @param elementComparator comparator for the contained elements. + * @param elementPriorityComparator comparator for the priority of contained elements. * @param keyExtractor function to extract a key from the contained elements. * @param minimumCapacity the minimum and initial capacity of this priority queue. * @param keyGroupRange the key-group range of the elements in this set. @@ -82,13 +81,13 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He */ @SuppressWarnings("unchecked") public HeapPriorityQueueSet( - @Nonnull Comparator<T> elementComparator, + @Nonnull PriorityComparator<T> elementPriorityComparator, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnegative int minimumCapacity, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int totalNumberOfKeyGroups) { - super(elementComparator, minimumCapacity); + super(elementPriorityComparator, minimumCapacity); this.keyExtractor = keyExtractor; @@ -147,28 +146,9 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He } } - /** - * Returns an unmodifiable set of all elements in the given key-group. - */ - @Nonnull - public Set<T> getElementsForKeyGroup(@Nonnegative int keyGroupIdx) { - return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet()); - } - - @VisibleForTesting - @SuppressWarnings("unchecked") - @Nonnull - public List<Set<T>> getElementsByKeyGroup() { - List<Set<T>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length); - for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) { - result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet())); - } - return result; - } - private HashMap<T, T> getDedupMapForKeyGroup( - @Nonnegative int keyGroupIdx) { - return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)]; + @Nonnegative int keyGroupId) { + return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)]; } private HashMap<T, T> getDedupMapForElement(T element) { @@ -182,4 +162,10 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He checkArgument(keyGroupRange.contains(keyGroup)); return keyGroup - keyGroupRange.getStartKeyGroup(); } + + @Nonnull + @Override + public Set<T> getSubsetForKeyGroup(int keyGroupId) { + return getDedupMapForKeyGroup(keyGroupId).keySet(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java new file mode 100644 index 0000000..ee6fda9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.PriorityComparator; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +/** + * + */ +public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory { + + @Nonnull + private final KeyGroupRange keyGroupRange; + + @Nonnegative + private final int totalKeyGroups; + + @Nonnegative + private final int minimumCapacity; + + public HeapPriorityQueueSetFactory( + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups, + @Nonnegative int minimumCapacity) { + + this.keyGroupRange = keyGroupRange; + this.totalKeyGroups = totalKeyGroups; + this.minimumCapacity = minimumCapacity; + } + + @Nonnull + @Override + public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create( + @Nonnull String stateName, + @Nonnull TypeSerializer<T> byteOrderedElementSerializer, + @Nonnull PriorityComparator<T> elementPriorityComparator, + @Nonnull KeyExtractorFunction<T> keyExtractor) { + return new HeapPriorityQueueSet<>( + elementPriorityComparator, + keyExtractor, + minimumCapacity, + keyGroupRange, + totalKeyGroups); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java index af4d54f..6f4f911 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java @@ -22,7 +22,10 @@ import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.PriorityComparator; import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import javax.annotation.Nonnegative; @@ -30,7 +33,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collection; -import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and @@ -41,7 +47,7 @@ import java.util.Comparator; * @param <PQ> type type of sub-queue used for each key-group partition. */ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement> - implements InternalPriorityQueue<T> { + implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> { /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/ @Nonnull @@ -66,7 +72,7 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu @SuppressWarnings("unchecked") public KeyGroupPartitionedPriorityQueue( @Nonnull KeyExtractorFunction<T> keyExtractor, - @Nonnull Comparator<T> elementComparator, + @Nonnull PriorityComparator<T> elementPriorityComparator, @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int totalKeyGroups) { @@ -76,16 +82,25 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu this.firstKeyGroup = keyGroupRange.getStartKeyGroup(); this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()]; this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>( - new InternalPriorityQueueComparator<>(elementComparator), + new InternalPriorityQueueComparator<>(elementPriorityComparator), keyGroupRange.getNumberOfKeyGroups()); for (int i = 0; i < keyGroupedHeaps.length; i++) { final PQ keyGroupSubHeap = - orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator); + orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementPriorityComparator); keyGroupedHeaps[i] = keyGroupSubHeap; heapOfkeyGroupedHeaps.add(keyGroupSubHeap); } } + @Override + public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) { + T element; + while ((element = peek()) != null && canConsume.test(element)) { + poll(); + consumer.accept(element); + } + } + @Nullable @Override public T poll() { @@ -173,9 +188,28 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu private int computeKeyGroupIndex(T element) { final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element); final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups); + return globalKeyGroupToLocalIndex(keyGroupId); + } + + private int globalKeyGroupToLocalIndex(int keyGroupId) { return keyGroupId - firstKeyGroup; } + @Nonnull + @Override + public Set<T> getSubsetForKeyGroup(int keyGroupId) { + HashSet<T> result = new HashSet<>(); + PQ partitionQueue = keyGroupedHeaps[globalKeyGroupToLocalIndex(keyGroupId)]; + try (CloseableIterator<T> iterator = partitionQueue.iterator()) { + while (iterator.hasNext()) { + result.add(iterator.next()); + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while iterating key group.", e); + } + return result; + } + /** * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements. * Using code must {@link #close()} after usage. @@ -236,24 +270,24 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu * @param <Q> type of queue. */ private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>> - implements Comparator<Q> { + implements PriorityComparator<Q> { /** Comparator for the queue elements, so we can compare their heads. */ @Nonnull - private final Comparator<T> elementComparator; + private final PriorityComparator<T> elementPriorityComparator; - InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) { - this.elementComparator = elementComparator; + InternalPriorityQueueComparator(@Nonnull PriorityComparator<T> elementPriorityComparator) { + this.elementPriorityComparator = elementPriorityComparator; } @Override - public int compare(Q o1, Q o2) { + public int comparePriority(Q o1, Q o2) { final T left = o1.peek(); final T right = o2.peek(); if (left == null) { return (right == null ? 0 : 1); } else { - return (right == null ? -1 : elementComparator.compare(left, right)); + return (right == null ? -1 : elementPriorityComparator.comparePriority(left, right)); } } } @@ -271,10 +305,13 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu * * @param keyGroupId the key-group of the elements managed by the produced queue. * @param numKeyGroups the total number of key-groups in the job. - * @param elementComparator the comparator that determines the order of the managed elements. + * @param elementPriorityComparator the comparator that determines the order of managed elements by priority. * @return a new queue for the given key-group. */ @Nonnull - PQS create(@Nonnegative int keyGroupId, @Nonnegative int numKeyGroups, @Nonnull Comparator<T> elementComparator); + PQS create( + @Nonnegative int keyGroupId, + @Nonnegative int numKeyGroups, + @Nonnull PriorityComparator<T> elementPriorityComparator); } } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java index 0e7d9dd..14c281e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.heap; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet; @@ -125,4 +126,10 @@ public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.O public E peekLast() { return !avlTree.isEmpty() ? avlTree.last() : null; } + + @Nonnull + @Override + public CloseableIterator<E> orderedIterator() { + return CloseableIterator.adapterForIterator(avlTree.iterator()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 3da60e4..d78944c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.util.TernaryBoolean; import javax.annotation.Nullable; @@ -309,7 +310,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf TaskKvStateRegistry kvStateRegistry) { TaskStateManager taskStateManager = env.getTaskStateManager(); - + HeapPriorityQueueSetFactory priorityQueueSetFactory = + new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, @@ -318,7 +320,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf keyGroupRange, isUsingAsynchronousSnapshots(), env.getExecutionConfig(), - taskStateManager.createLocalRecoveryConfig()); + taskStateManager.createLocalRecoveryConfig(), + priorityQueueSetFactory); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java index c0c3ba4..0cd551c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java @@ -51,8 +51,16 @@ public abstract class InternalPriorityQueueTestBase extends TestLogger { protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey; - protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR = - Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + protected static final PriorityComparator<TestElement> TEST_ELEMENT_PRIORITY_COMPARATOR = + (left, right) -> Long.compare(left.getPriority(), right.getPriority()); + protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR = (o1, o2) -> { + int priorityCmp = TEST_ELEMENT_PRIORITY_COMPARATOR.comparePriority(o1, o2); + if (priorityCmp != 0) { + return priorityCmp; + } + // to fully comply with compareTo/equals contract. + return Long.compare(o1.getKey(), o2.getKey()); + }; protected static void insertRandomElements( @Nonnull InternalPriorityQueue<TestElement> priorityQueue, http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index 3c06b71..dfcdffc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -53,7 +53,8 @@ public class StateSnapshotCompressionTest extends TestLogger { new KeyGroupRange(0, 15), true, executionConfig, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + mock(PriorityQueueSetFactory.class)); try { Assert.assertTrue( @@ -75,7 +76,8 @@ public class StateSnapshotCompressionTest extends TestLogger { new KeyGroupRange(0, 15), true, executionConfig, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + mock(PriorityQueueSetFactory.class)); try { Assert.assertTrue( @@ -115,7 +117,8 @@ public class StateSnapshotCompressionTest extends TestLogger { new KeyGroupRange(0, 15), true, executionConfig, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + mock(PriorityQueueSetFactory.class)); try { @@ -156,7 +159,8 @@ public class StateSnapshotCompressionTest extends TestLogger { new KeyGroupRange(0, 15), true, executionConfig, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + mock(PriorityQueueSetFactory.class)); try { stateBackend.restore(StateObjectCollection.singleton(stateHandle)); http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java index 618da4e..415497d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java @@ -25,7 +25,7 @@ public class HeapPriorityQueueSetTest extends HeapPriorityQueueTest { @Override protected HeapPriorityQueueSet<TestElement> newPriorityQueue(int initialCapacity) { return new HeapPriorityQueueSet<>( - TEST_ELEMENT_COMPARATOR, + TEST_ELEMENT_PRIORITY_COMPARATOR, KEY_EXTRACTOR_FUNCTION, initialCapacity, KEY_GROUP_RANGE, http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java index 8ffb8b8..6ba5a68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java @@ -89,7 +89,7 @@ public class HeapPriorityQueueTest extends InternalPriorityQueueTestBase { @Override protected HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) { - return new HeapPriorityQueue<>(TEST_ELEMENT_COMPARATOR, initialCapacity); + return new HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index bf428dc..cf6aef4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -49,14 +49,18 @@ public abstract class HeapStateBackendTestBase { } public <K> HeapKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception { + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 15); + final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups(); + return new HeapKeyedStateBackend<>( mock(TaskKvStateRegistry.class), keySerializer, HeapStateBackendTestBase.class.getClassLoader(), - 16, - new KeyGroupRange(0, 15), + numKeyGroups, + keyGroupRange, async, new ExecutionConfig(), - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java index 277de19..d348e10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java @@ -29,7 +29,7 @@ public class KeyGroupPartitionedPriorityQueueTest extends InternalPriorityQueueT protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) { return new KeyGroupPartitionedPriorityQueue<>( KEY_EXTRACTOR_FUNCTION, - TEST_ELEMENT_COMPARATOR, + TEST_ELEMENT_PRIORITY_COMPARATOR, newFactory(initialCapacity), KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups()); } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java new file mode 100644 index 0000000..ede45e3 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Configuration options for the RocksDB backend. + */ +public class RockDBBackendOptions { + + /** + * Choice of implementation for priority queue state (e.g. timers). + */ + public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions + .key("backend.rocksdb.priority_queue_state_type") + .defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name()) + .withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" + + "either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " + + RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB."); +} http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 21d2a65..f2430ae 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -58,14 +58,18 @@ import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.PriorityComparator; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.SnapshotDirectory; @@ -76,7 +80,13 @@ import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TieBreakingPriorityComparator; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; +import org.apache.flink.runtime.state.heap.TreeOrderedSetCache; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -243,6 +253,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */ private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy; + /** Factory for priority queue state. */ + private PriorityQueueSetFactory priorityQueueFactory; + public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -255,7 +268,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, - LocalRecoveryConfig localRecoveryConfig + LocalRecoveryConfig localRecoveryConfig, + RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); @@ -296,6 +310,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.writeOptions = new WriteOptions().setDisableWAL(true); + switch (priorityQueueStateType) { + case HEAP: + this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); + break; + case ROCKS: + this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory(); + break; + default: + break; + } + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @@ -378,6 +403,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { IOUtils.closeQuietly(columnMetaData.f0); } + // ... then close the priority queue related resources ... + if (priorityQueueFactory instanceof AutoCloseable) { + IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory); + } + // ... and finally close the DB instance ... IOUtils.closeQuietly(db); @@ -394,6 +424,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + @Nonnull + @Override + public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create( + @Nonnull String stateName, + @Nonnull TypeSerializer<T> byteOrderedElementSerializer, + @Nonnull PriorityComparator<T> elementComparator, + @Nonnull KeyExtractorFunction<T> keyExtractor) { + + return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor); + } + private void cleanInstanceBasePath() { LOG.info("Deleting existing instance base directory {}.", instanceBasePath); @@ -1290,7 +1331,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { namespaceSerializer, stateDesc.getSerializer()); - ColumnFamilyHandle columnFamily = createColumnFamily(stateName); + ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db); stateInfo = Tuple2.of(columnFamily, newMetaInfo); kvStateInformation.put(stateDesc.getName(), stateInfo); @@ -1302,7 +1343,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Creates a column family handle for use with a k/v state. */ - private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException { + private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) { byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET); Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!"); @@ -1312,7 +1353,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { return db.createColumnFamily(columnDescriptor); } catch (RocksDBException e) { - throw new IOException("Error creating ColumnFamilyHandle.", e); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); } } @@ -2579,4 +2620,126 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ReadOptions readOptions) { return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions)); } + + /** + * Encapsulates the logic and resources in connection with creating priority queue state structures. + */ + class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable { + + /** Default cache size per key-group. */ + private static final int DEFAULT_CACHES_SIZE = 8 * 1024; + + /** A shared buffer to serialize elements for the priority queue. */ + @Nonnull + private final ByteArrayOutputStreamWithPos elementSerializationOutStream; + + /** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */ + @Nonnull + private final DataOutputViewStreamWrapper elementSerializationOutView; + + /** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */ + @Nonnull + private final RocksDBWriteBatchWrapper writeBatchWrapper; + + /** Map to track all column families created to back priority queues. */ + @Nonnull + private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies; + + /** The mandatory default column family, so that we can close it later. */ + @Nonnull + private final ColumnFamilyHandle defaultColumnFamily; + + /** Path of the RocksDB instance that holds the priority queues. */ + @Nonnull + private final File pqInstanceRocksDBPath; + + /** RocksDB instance that holds the priority queues. */ + @Nonnull + private final RocksDB pqDb; + + RocksDBPriorityQueueSetFactory() throws IOException { + this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb"); + if (pqInstanceRocksDBPath.exists()) { + try { + FileUtils.deleteDirectory(pqInstanceRocksDBPath); + } catch (IOException ex) { + LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex); + } + } + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos(); + this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream); + this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions); + this.defaultColumnFamily = columnFamilyHandles.get(0); + this.priorityQueueColumnFamilies = new HashMap<>(); + } + + @Nonnull + @Override + public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create( + @Nonnull String stateName, + @Nonnull TypeSerializer<T> byteOrderedElementSerializer, + @Nonnull PriorityComparator<T> elementPriorityComparator, + @Nonnull KeyExtractorFunction<T> keyExtractor) { + + final ColumnFamilyHandle columnFamilyHandle = + priorityQueueColumnFamilies.computeIfAbsent( + stateName, + (name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb)); + + @Nonnull + TieBreakingPriorityComparator<T> tieBreakingComparator = + new TieBreakingPriorityComparator<>( + elementPriorityComparator, + byteOrderedElementSerializer, + elementSerializationOutStream, + elementSerializationOutView); + + return new KeyGroupPartitionedPriorityQueue<>( + keyExtractor, + elementPriorityComparator, + new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() { + @Nonnull + @Override + public CachingInternalPriorityQueueSet<T> create( + int keyGroupId, + int numKeyGroups, + @Nonnull PriorityComparator<T> elementPriorityComparator) { + + CachingInternalPriorityQueueSet.OrderedSetCache<T> cache = + new TreeOrderedSetCache<>(tieBreakingComparator, DEFAULT_CACHES_SIZE); + CachingInternalPriorityQueueSet.OrderedSetStore<T> store = + new RocksDBOrderedSetStore<>( + keyGroupId, + keyGroupPrefixBytes, + pqDb, + columnFamilyHandle, + byteOrderedElementSerializer, + elementSerializationOutStream, + elementSerializationOutView, + writeBatchWrapper); + + return new CachingInternalPriorityQueueSet<>(cache, store); + } + }, + keyGroupRange, + numberOfKeyGroups); + } + + @Override + public void close() { + IOUtils.closeQuietly(writeBatchWrapper); + for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) { + IOUtils.closeQuietly(columnFamilyHandle); + } + IOUtils.closeQuietly(defaultColumnFamily); + IOUtils.closeQuietly(pqDb); + try { + FileUtils.deleteDirectory(pqInstanceRocksDBPath); + } catch (IOException ex) { + LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java index e512933..5284314 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java @@ -28,7 +28,6 @@ import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.FlinkRuntimeException; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -61,10 +60,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe @Nonnull private final ColumnFamilyHandle columnFamilyHandle; - /** Read options for RocksDB. */ - @Nonnull - private final ReadOptions readOptions; - /** * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be * aligned with their logical order. @@ -93,14 +88,12 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe @Nonnegative int keyGroupPrefixBytes, @Nonnull RocksDB db, @Nonnull ColumnFamilyHandle columnFamilyHandle, - @Nonnull ReadOptions readOptions, @Nonnull TypeSerializer<T> byteOrderProducingSerializer, @Nonnull ByteArrayOutputStreamWithPos outputStream, @Nonnull DataOutputViewStreamWrapper outputView, @Nonnull RocksDBWriteBatchWrapper batchWrapper) { this.db = db; this.columnFamilyHandle = columnFamilyHandle; - this.readOptions = readOptions; this.byteOrderProducingSerializer = byteOrderProducingSerializer; this.outputStream = outputStream; this.outputView = outputView; @@ -169,7 +162,7 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe return new RocksToJavaIteratorAdapter( new RocksIteratorWrapper( - db.newIterator(columnFamilyHandle, readOptions))); + db.newIterator(columnFamilyHandle))); } /** @@ -232,6 +225,10 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe private RocksToJavaIteratorAdapter(@Nonnull RocksIteratorWrapper iterator) { this.iterator = iterator; try { + // TODO we could check if it is more efficient to make the seek more specific, e.g. with a provided hint + // that is lexicographically closer the first expected element in the key-group. I wonder if this could + // help to improve the seek if there are many tombstones for elements at the beginning of the key-group + // (like for elements that have been removed in previous polling, before they are compacted away). iterator.seek(groupPrefixBytes); deserializeNextElementIfAvailable(); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 81d6265..998521b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -76,6 +77,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend { + /** + * The options to chose for the type of priority queue state. + */ + public enum PriorityQueueStateType { + HEAP, + ROCKS + } + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); @@ -109,6 +118,9 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu /** This determines if incremental checkpointing is enabled. */ private final TernaryBoolean enableIncrementalCheckpointing; + /** This determines the type of priority queue state. */ + private final PriorityQueueStateType priorityQueueStateType; + // -- runtime values, set on TaskManager when initializing / using the backend /** Base paths for RocksDB directory, as initialized. */ @@ -221,6 +233,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + // for now, we use still the heap-based implementation as default + this.priorityQueueStateType = PriorityQueueStateType.HEAP; } /** @@ -256,6 +270,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined( config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)); + final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), ""); + + this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? + PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType; + // configure local directories if (original.localRocksDbDirectories != null) { this.localRocksDbDirectories = original.localRocksDbDirectories; @@ -422,7 +441,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu keyGroupRange, env.getExecutionConfig(), isIncrementalCheckpointsEnabled(), - localRecoveryConfig); + localRecoveryConfig, + priorityQueueStateType); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java index ae20cf2..5f26835 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java @@ -57,7 +57,6 @@ public class CachingInternalPriorityQueueSetWithRocksDBStoreTest extends Caching prefixBytes, rocksDBResource.getRocksDB(), rocksDBResource.getDefaultColumnFamily(), - rocksDBResource.getReadOptions(), TestElementSerializer.INSTANCE, outputStream, outputView, http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java index 256a83b..0b1d07b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java @@ -124,7 +124,6 @@ public class RocksDBOrderedSetStoreTest { keyGroupPrefixBytes, rocksDBResource.getRocksDB(), rocksDBResource.getDefaultColumnFamily(), - rocksDBResource.getReadOptions(), byteOrderSerializer, outputStreamWithPos, outputView, http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index ad89583..69069d6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -240,7 +240,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa new KeyGroupRange(0, 0), new ExecutionConfig(), enableIncrementalCheckpointing, - TestLocalRecoveryConfig.disabled()); + TestLocalRecoveryConfig.disabled(), + RocksDBStateBackend.PriorityQueueStateType.HEAP); verify(columnFamilyOptions, Mockito.times(1)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 9915dd5..797a26a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -730,9 +730,11 @@ public abstract class AbstractStreamOperator<OUT> checkTimerServiceInitialization(); // the following casting is to overcome type restrictions. - TypeSerializer<K> keySerializer = (TypeSerializer<K>) getKeyedStateBackend().getKeySerializer(); + KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend(); + TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer(); InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager; - return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable); + TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer); + return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable); } public void processWatermark(Watermark mark) throws Exception {