http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java new file mode 100644 index 0000000..1d627ed --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.heap; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; +import java.util.TreeSet; + +/** + * Test base for instances of + * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache}. + */ +public abstract class OrderedSetCacheTestBase { + + @Test + public void testOrderedSetCacheContract() { + final Random random = new Random(0x42); + final int capacity = 5000; + final int keySpaceUpperBound = 100 * capacity; + final TreeSet<Integer> checkSet = new TreeSet<>(Integer::compareTo); + final CachingInternalPriorityQueueSet.OrderedSetCache<Integer> testInstance = createInstance(capacity); + + Assert.assertTrue(testInstance.isEmpty()); + + while (checkSet.size() < capacity) { + Assert.assertEquals(checkSet.size() >= capacity, testInstance.isFull()); + if (!checkSet.isEmpty() && random.nextInt(10) == 0) { + final int toDelete = pickContainedRandomElement(checkSet, random); + Assert.assertTrue(checkSet.remove(toDelete)); + testInstance.remove(toDelete); + } else { + final int randomValue = random.nextInt(keySpaceUpperBound); + checkSet.add(randomValue); + testInstance.add(randomValue); + } + Assert.assertEquals(checkSet.isEmpty(), testInstance.isEmpty()); + + Assert.assertEquals(checkSet.first(), testInstance.peekFirst()); + Assert.assertEquals(checkSet.last(), testInstance.peekLast()); + + Assert.assertFalse(testInstance.isInLowerBound(checkSet.last())); + Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 1)); + } + + Assert.assertTrue(testInstance.isFull()); + Assert.assertFalse(testInstance.isInLowerBound(checkSet.last())); + Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 1)); + + testInstance.remove(pickNotContainedRandomElement(checkSet, random, keySpaceUpperBound)); + Assert.assertTrue(testInstance.isFull()); + + int containedKey = pickContainedRandomElement(checkSet, random); + + Assert.assertTrue(checkSet.remove(containedKey)); + testInstance.remove(containedKey); + + Assert.assertFalse(testInstance.isFull()); + + for (int i = 0; i < capacity; ++i) { + if (random.nextInt(1) == 0) { + Assert.assertEquals(checkSet.pollFirst(), testInstance.removeFirst()); + } else { + Assert.assertEquals(checkSet.pollLast(), testInstance.removeLast()); + } + } + + Assert.assertFalse(testInstance.isFull()); + Assert.assertTrue(testInstance.isEmpty()); + } + + private int pickNotContainedRandomElement(TreeSet<Integer> checkSet, Random random, int upperBound) { + int notContainedKey; + do { + notContainedKey = random.nextInt(upperBound); + } while (checkSet.contains(notContainedKey)); + return notContainedKey; + } + + private int pickContainedRandomElement(TreeSet<Integer> checkSet, Random random) { + assert !checkSet.isEmpty(); + return checkSet.ceiling(1 + random.nextInt(checkSet.last())); + } + + protected abstract CachingInternalPriorityQueueSet.OrderedSetCache<Integer> createInstance(int capacity); +}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java new file mode 100644 index 0000000..4c4eb46 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +/** + * Test for {@link CachingInternalPriorityQueueSet}. + */ +public class SimpleCachingInternalPriorityQueueSetTest extends CachingInternalPriorityQueueSetTestBase { + + @Override + protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore() { + return new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR); + } + + @Override + protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache() { + return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 3); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java new file mode 100644 index 0000000..36a3341 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnull; + +import java.util.Comparator; +import java.util.TreeSet; + +/** + * Simple implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * for tests. + */ +public class TestOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> { + + private final TreeSet<T> treeSet; + + public TestOrderedStore(Comparator<T> comparator) { + this.treeSet = new TreeSet<>(comparator); + } + + @Override + public void add(@Nonnull T element) { + treeSet.add(element); + } + + @Override + public void remove(@Nonnull T element) { + treeSet.remove(element); + } + + @Override + public int size() { + return treeSet.size(); + } + + @Nonnull + @Override + public CloseableIterator<T> orderedIterator() { + return CloseableIterator.adapterForIterator(treeSet.iterator()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java new file mode 100644 index 0000000..cfe823e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +/** + * Test for {@link TreeOrderedSetCache}. + */ +public class TreeOrderedSetCacheTest extends OrderedSetCacheTestBase { + + @Override + protected CachingInternalPriorityQueueSet.OrderedSetCache<Integer> createInstance(int capacity) { + return new TreeOrderedSetCache<>(Integer::compareTo, capacity); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java index fb7db34..5f1c650 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java @@ -149,4 +149,8 @@ class RocksDBKeySerializationUtils { private static byte extractByteAtPosition(int value, int byteIdx) { return (byte) ((value >>> (byteIdx << 3))); } + + public static int computeRequiredBytesInKeyGroupPrefix(int totalKeyGroupsInJob) { + return totalKeyGroupsInJob > (Byte.MAX_VALUE + 1) ? 2 : 1; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index e5f443a..21d2a65 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -283,7 +283,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); - this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; + this.keyGroupPrefixBytes = + RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups()); this.kvStateInformation = new LinkedHashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java new file mode 100644 index 0000000..e512933 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param <T> the type of stored elements. + */ +public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = new byte[] {0}; + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** + * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be + * aligned with their logical order. + */ + @Nonnull + private final TypeSerializer<T> byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedSetStore( + @Nonnegative int keyGroupId, + @Nonnegative int keyGroupPrefixBytes, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer<T> byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; + this.readOptions = readOptions; + this.byteOrderProducingSerializer = byteOrderProducingSerializer; + this.outputStream = outputStream; + this.outputView = outputView; + this.batchWrapper = batchWrapper; + this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes); + } + + private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) { + + outputStream.reset(); + + try { + RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not write key-group bytes.", e); + } + + return outputStream.toByteArray(); + } + + @Override + public void add(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + try { + batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES); + } catch (RocksDBException e) { + throw new FlinkRuntimeException("Error while getting element from RocksDB.", e); + } + } + + @Override + public void remove(@Nonnull T element) { + byte[] elementBytes = serializeElement(element); + try { + batchWrapper.remove(columnFamilyHandle, elementBytes); + } catch (RocksDBException e) { + throw new FlinkRuntimeException("Error while removing element from RocksDB.", e); + } + } + + /** + * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is + * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests. + * + * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore + */ + @Override + public int size() { + + int count = 0; + try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) { + while (iterator.hasNext()) { + iterator.next(); + ++count; + } + } + + return count; + } + + @Nonnull + @Override + public RocksToJavaIteratorAdapter orderedIterator() { + + flushWriteBatch(); + + return new RocksToJavaIteratorAdapter( + new RocksIteratorWrapper( + db.newIterator(columnFamilyHandle, readOptions))); + } + + /** + * Ensures that recent writes are flushed and reflect in the RocksDB instance. + */ + private void flushWriteBatch() { + try { + batchWrapper.flush(); + } catch (RocksDBException e) { + throw new FlinkRuntimeException(e); + } + } + + private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) { + for (int i = 0; i < prefixBytes.length; ++i) { + if (bytes[i] != prefixBytes[i]) { + return false; + } + } + return true; + } + + private byte[] serializeElement(T element) { + try { + outputStream.reset(); + outputView.write(groupPrefixBytes); + byteOrderProducingSerializer.serialize(element, outputView); + return outputStream.toByteArray(); + } catch (IOException e) { + throw new FlinkRuntimeException("Error while serializing the element.", e); + } + } + + private T deserializeElement(byte[] bytes) { + try { + // TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call + ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes); + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream); + inputView.skipBytes(groupPrefixBytes.length); + return byteOrderProducingSerializer.deserialize(inputView); + } catch (IOException e) { + throw new FlinkRuntimeException("Error while deserializing the element.", e); + } + } + + /** + * Adapter between RocksDB iterator and Java iterator. This is also closeable to release the native resources after + * use. + */ + private class RocksToJavaIteratorAdapter implements CloseableIterator<T> { + + /** The RocksDb iterator to which we forward ops. */ + @Nonnull + private final RocksIteratorWrapper iterator; + + /** Cache for the current element of the iteration. */ + @Nullable + private T currentElement; + + private RocksToJavaIteratorAdapter(@Nonnull RocksIteratorWrapper iterator) { + this.iterator = iterator; + try { + iterator.seek(groupPrefixBytes); + deserializeNextElementIfAvailable(); + } catch (Exception ex) { + // ensure resource cleanup also in the face of (runtime) exceptions in the constructor. + iterator.close(); + throw new FlinkRuntimeException("Could not initialize ordered iterator.", ex); + } + } + + @Override + public void close() { + iterator.close(); + } + + @Override + public boolean hasNext() { + return currentElement != null; + } + + @Override + public T next() { + final T returnElement = this.currentElement; + if (returnElement == null) { + throw new NoSuchElementException("Iterator has no more elements!"); + } + iterator.next(); + deserializeNextElementIfAvailable(); + return returnElement; + } + + private void deserializeNextElementIfAvailable() { + if (iterator.isValid()) { + final byte[] elementBytes = iterator.key(); + if (isPrefixWith(elementBytes, groupPrefixBytes)) { + this.currentElement = deserializeElement(elementBytes); + } else { + this.currentElement = null; + } + } else { + this.currentElement = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java new file mode 100644 index 0000000..ae20cf2 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSetTestBase; +import org.apache.flink.runtime.state.heap.TreeOrderedSetCache; + +import org.junit.Rule; + +/** + * Test for {@link CachingInternalPriorityQueueSet} with a + * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} based on RocksDB. + */ +public class CachingInternalPriorityQueueSetWithRocksDBStoreTest extends CachingInternalPriorityQueueSetTestBase { + + @Rule + public final RocksDBResource rocksDBResource = new RocksDBResource(); + + @Override + protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore() { + return createRocksDBStore(0, 1, rocksDBResource); + } + + @Override + protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache() { + return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32); + } + + public static CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createRocksDBStore( + int keyGroupId, + int totalKeyGroups, + RocksDBResource rocksDBResource) { + ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(16); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); + int prefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups); + return new RocksDBOrderedSetStore<>( + keyGroupId, + prefixBytes, + rocksDBResource.getRocksDB(), + rocksDBResource.getDefaultColumnFamily(), + rocksDBResource.getReadOptions(), + TestElementSerializer.INSTANCE, + outputStream, + outputView, + rocksDBResource.getBatchWrapper()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java new file mode 100644 index 0000000..42782a4 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; +import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueueTest; +import org.apache.flink.runtime.state.heap.TreeOrderedSetCache; + +import org.junit.Rule; + +/** + * Test of {@link KeyGroupPartitionedPriorityQueue} powered by a {@link RocksDBOrderedSetStore}. + */ +public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends KeyGroupPartitionedPriorityQueueTest { + + @Rule + public final RocksDBResource rocksDBResource = new RocksDBResource(); + + @Override + protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory< + TestElement, CachingInternalPriorityQueueSet<TestElement>> newFactory( + int initialCapacity) { + + return (keyGroupId, numKeyGroups, elementComparator) -> { + CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache = + new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32); + CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store = + RocksDBOrderedSetStoreTest.createRocksDBOrderedStore( + rocksDBResource, + TestElementSerializer.INSTANCE, + keyGroupId, + numKeyGroups); + return new CachingInternalPriorityQueueSet<>(cache, store); + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java new file mode 100644 index 0000000..256a83b --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.NoSuchElementException; + +/** + * Test for RocksDBOrderedStore. + */ +public class RocksDBOrderedSetStoreTest { + + @Rule + public final RocksDBResource rocksDBResource = new RocksDBResource(); + + @Test + public void testOrderedIterator() throws Exception { + CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store = createRocksDBOrderedStore(); + + //test empty iterator + try (final CloseableIterator<Integer> emptyIterator = store.orderedIterator()) { + Assert.assertFalse(emptyIterator.hasNext()); + try { + emptyIterator.next(); + Assert.fail(); + } catch (NoSuchElementException expected) { + } + } + + store.add(43); + store.add(42); + store.add(41); + store.add(41); + store.remove(42); + + // test in-order iteration + try (final CloseableIterator<Integer> iterator = store.orderedIterator()) { + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Integer.valueOf(41), iterator.next()); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Integer.valueOf(43), iterator.next()); + Assert.assertFalse(iterator.hasNext()); + try { + iterator.next(); + Assert.fail(); + } catch (NoSuchElementException expected) { + } + } + } + + @Test + public void testAddRemoveSize() { + + CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store = createRocksDBOrderedStore(); + + // test empty size + Assert.assertEquals(0, store.size()); + + // test add uniques + store.remove(41); + Assert.assertEquals(0, store.size()); + store.add(41); + Assert.assertEquals(1, store.size()); + store.add(42); + Assert.assertEquals(2, store.size()); + store.add(43); + Assert.assertEquals(3, store.size()); + store.add(44); + Assert.assertEquals(4, store.size()); + store.add(45); + Assert.assertEquals(5, store.size()); + + // test remove + store.remove(41); + Assert.assertEquals(4, store.size()); + store.remove(41); + Assert.assertEquals(4, store.size()); + + // test set semantics by attempt to insert duplicate + store.add(42); + Assert.assertEquals(4, store.size()); + } + + public static <E> RocksDBOrderedSetStore<E> createRocksDBOrderedStore( + @Nonnull RocksDBResource rocksDBResource, + @Nonnull TypeSerializer<E> byteOrderSerializer, + @Nonnegative int keyGroupId, + @Nonnegative int totalKeyGroups) { + + ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStreamWithPos); + int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups); + return new RocksDBOrderedSetStore<>( + keyGroupId, + keyGroupPrefixBytes, + rocksDBResource.getRocksDB(), + rocksDBResource.getDefaultColumnFamily(), + rocksDBResource.getReadOptions(), + byteOrderSerializer, + outputStreamWithPos, + outputView, + rocksDBResource.getBatchWrapper()); + } + + protected RocksDBOrderedSetStore<Integer> createRocksDBOrderedStore() { + return createRocksDBOrderedStore(rocksDBResource, IntSerializer.INSTANCE, 0, 1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java new file mode 100644 index 0000000..0407cc7 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; + +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * External resource for tests that require an instance of RocksDB. + */ +public class RocksDBResource extends ExternalResource { + + /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */ + private final OptionsFactory optionsFactory; + + /** Temporary folder that provides the working directory for the RocksDB instance. */ + private TemporaryFolder temporaryFolder; + + /** The options for the RocksDB instance. */ + private DBOptions dbOptions; + + /** The options for column families created with the RocksDB instance. */ + private ColumnFamilyOptions columnFamilyOptions; + + /** The options for writes. */ + private WriteOptions writeOptions; + + /** The options for reads. */ + private ReadOptions readOptions; + + /** The RocksDB instance object. */ + private RocksDB rocksDB; + + /** List of all column families that have been created with the RocksDB instance. */ + private List<ColumnFamilyHandle> columnFamilyHandles; + + /** Wrapper for batched writes to the RocksDB instance. */ + private RocksDBWriteBatchWrapper batchWrapper; + + public RocksDBResource() { + this(new OptionsFactory() { + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return PredefinedOptions.FLASH_SSD_OPTIMIZED.createDBOptions(); + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(); + } + }); + } + + public RocksDBResource(@Nonnull OptionsFactory optionsFactory) { + this.optionsFactory = optionsFactory; + } + + public ColumnFamilyHandle getDefaultColumnFamily() { + return columnFamilyHandles.get(0); + } + + public WriteOptions getWriteOptions() { + return writeOptions; + } + + public RocksDB getRocksDB() { + return rocksDB; + } + + public ReadOptions getReadOptions() { + return readOptions; + } + + public RocksDBWriteBatchWrapper getBatchWrapper() { + return batchWrapper; + } + + /** + * Creates and returns a new column family with the given name. + */ + public ColumnFamilyHandle createNewColumnFamily(String name) { + try { + final ColumnFamilyHandle columnFamily = rocksDB.createColumnFamily( + new ColumnFamilyDescriptor(name.getBytes(), columnFamilyOptions)); + columnFamilyHandles.add(columnFamily); + return columnFamily; + } catch (Exception ex) { + throw new FlinkRuntimeException("Could not create column family.", ex); + } + } + + @Override + protected void before() throws Throwable { + this.temporaryFolder = new TemporaryFolder(); + this.temporaryFolder.create(); + final File rocksFolder = temporaryFolder.newFolder(); + this.dbOptions = optionsFactory.createDBOptions(PredefinedOptions.DEFAULT.createDBOptions()). + setCreateIfMissing(true); + this.columnFamilyOptions = optionsFactory.createColumnOptions(PredefinedOptions.DEFAULT.createColumnOptions()); + this.writeOptions = new WriteOptions(); + this.writeOptions.disableWAL(); + this.readOptions = new ReadOptions(); + this.columnFamilyHandles = new ArrayList<>(1); + this.rocksDB = RocksDB.open( + dbOptions, + rocksFolder.getAbsolutePath(), + Collections.singletonList(new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions)), + columnFamilyHandles); + this.batchWrapper = new RocksDBWriteBatchWrapper(rocksDB, writeOptions); + } + + @Override + protected void after() { + // destruct in reversed order of creation. + IOUtils.closeQuietly(this.batchWrapper); + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + IOUtils.closeQuietly(this.rocksDB); + IOUtils.closeQuietly(this.readOptions); + IOUtils.closeQuietly(this.writeOptions); + IOUtils.closeQuietly(this.columnFamilyOptions); + IOUtils.closeQuietly(this.dbOptions); + temporaryFolder.delete(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java index c5a68fb..7bf652f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java @@ -22,9 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import java.util.List; @@ -46,12 +50,12 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, /** * Processing time timers that are currently in-flight. */ - private final InternalTimerHeap<K, N> processingTimeTimersQueue; + private final HeapPriorityQueueSet<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue; /** * Event time timers that are currently in-flight. */ - private final InternalTimerHeap<K, N> eventTimeTimersQueue; + private final HeapPriorityQueueSet<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue; /** * Information concerning the local key-group range. @@ -106,8 +110,8 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, } this.localKeyGroupRangeStartIdx = startIdx; - this.eventTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups); - this.processingTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups); + this.eventTimeTimersQueue = createPriorityQueue(localKeyGroupRange, totalKeyGroups); + this.processingTimeTimersQueue = createPriorityQueue(localKeyGroupRange, totalKeyGroups); } /** @@ -188,7 +192,7 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, @Override public void registerProcessingTimeTimer(N namespace, long time) { InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); - if (processingTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace)) { + if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier if (time < nextTriggerTime) { @@ -202,17 +206,17 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, @Override public void registerEventTimeTimer(N namespace, long time) { - eventTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace); + eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteProcessingTimeTimer(N namespace, long time) { - processingTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace); + processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteEventTimeTimer(N namespace, long time) { - eventTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace); + eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override @@ -256,12 +260,12 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, */ public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) { return new InternalTimersSnapshot<>( - keySerializer, - keySerializer.snapshotConfiguration(), - namespaceSerializer, - namespaceSerializer.snapshotConfiguration(), - eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx), - processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx)); + keySerializer, + keySerializer.snapshotConfiguration(), + namespaceSerializer, + namespaceSerializer.snapshotConfiguration(), + eventTimeTimersQueue.getElementsForKeyGroup(keyGroupIdx), + processingTimeTimersQueue.getElementsForKeyGroup(keyGroupIdx)); } /** @@ -287,36 +291,43 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, "Key Group " + keyGroupIdx + " does not belong to the local range."); // restore the event time timers - eventTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getEventTimeTimers()); + eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers()); // restore the processing time timers - processingTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getProcessingTimeTimers()); + processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers()); } + @VisibleForTesting public int numProcessingTimeTimers() { return this.processingTimeTimersQueue.size(); } + @VisibleForTesting public int numEventTimeTimers() { return this.eventTimeTimersQueue.size(); } + @VisibleForTesting public int numProcessingTimeTimers(N namespace) { - int count = 0; - for (InternalTimer<K, N> timer : processingTimeTimersQueue) { - if (timer.getNamespace().equals(namespace)) { - count++; - } - } - return count; + return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue); } + @VisibleForTesting public int numEventTimeTimers(N namespace) { + return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue); + } + + private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) { int count = 0; - for (InternalTimer<K, N> timer : eventTimeTimersQueue) { - if (timer.getNamespace().equals(namespace)) { - count++; + try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) { + while (iterator.hasNext()) { + final TimerHeapInternalTimer<K, N> timer = iterator.next(); + if (timer.getNamespace().equals(namespace)) { + count++; + } } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception when closing iterator.", e); } return count; } @@ -327,17 +338,28 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, } @VisibleForTesting - List<Set<InternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() { - return eventTimeTimersQueue.getTimersByKeyGroup(); + List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() { + return eventTimeTimersQueue.getElementsByKeyGroup(); } @VisibleForTesting - List<Set<InternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() { - return processingTimeTimersQueue.getTimersByKeyGroup(); + List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() { + return processingTimeTimersQueue.getElementsByKeyGroup(); } private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) { return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer())); } + + private static <K, N> HeapPriorityQueueSet<TimerHeapInternalTimer<K, N>> createPriorityQueue( + KeyGroupRange localKeyGroupRange, + int totalKeyGroups) { + return new HeapPriorityQueueSet<>( + TimerHeapInternalTimer.getTimerComparator(), + TimerHeapInternalTimer.getKeyExtractorFunction(), + 128, + localKeyGroupRange, + totalKeyGroups); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java new file mode 100644 index 0000000..cc7fbc4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StateSnapshot; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.lang.reflect.Array; + +/** + * This class represents the snapshot of an {@link HeapPriorityQueueSet}. + * + * @param <T> type of the state elements. + */ +public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot { + + /** Function that extracts keys from elements. */ + @Nonnull + private final KeyExtractorFunction<T> keyExtractor; + + /** Copy of the heap array containing all the (immutable or deeply copied) elements. */ + @Nonnull + private final T[] heapArrayCopy; + + /** The element serializer. */ + @Nonnull + private final TypeSerializer<T> elementSerializer; + + /** The key-group range covered by this snapshot. */ + @Nonnull + private final KeyGroupRange keyGroupRange; + + /** The total number of key-groups in the job. */ + @Nonnegative + private final int totalKeyGroups; + + /** Result of partitioning the snapshot by key-group. */ + @Nullable + private KeyGroupPartitionedSnapshot partitionedSnapshot; + + HeapPriorityQueueStateSnapshot( + @Nonnull T[] heapArrayCopy, + @Nonnull KeyExtractorFunction<T> keyExtractor, + @Nonnull TypeSerializer<T> elementSerializer, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + // TODO ensure that the array contains a deep copy of elements if we are *not* dealing with immutable types. + assert elementSerializer.isImmutableType(); + + this.keyExtractor = keyExtractor; + this.heapArrayCopy = heapArrayCopy; + this.elementSerializer = elementSerializer; + this.keyGroupRange = keyGroupRange; + this.totalKeyGroups = totalKeyGroups; + } + + @SuppressWarnings("unchecked") + @Nonnull + @Override + public KeyGroupPartitionedSnapshot partitionByKeyGroup() { + + if (partitionedSnapshot == null) { + + T[] partitioningOutput = (T[]) Array.newInstance( + heapArrayCopy.getClass().getComponentType(), + heapArrayCopy.length); + + KeyGroupPartitioner<T> keyGroupPartitioner = + new KeyGroupPartitioner<>( + heapArrayCopy, + heapArrayCopy.length, + partitioningOutput, + keyGroupRange, + totalKeyGroups, + keyExtractor, + elementSerializer::serialize); + + partitionedSnapshot = keyGroupPartitioner.partitionByKeyGroup(); + } + + return partitionedSnapshot; + } + + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java deleted file mode 100644 index 09f5a14..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.StateSnapshot; - -import javax.annotation.Nonnegative; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - -import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains - * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes - * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. - * - * <p>Possible future improvements: - * <ul> - * <li>We could also implement shrinking for the heap and the deduplication maps.</li> - * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set - * would be enough if it could return existing elements on unsuccessful adding, etc..</li> - * </ul> - * - * @param <K> type of the key of the internal timers managed by this priority queue. - * @param <N> type of the namespace of the internal timers managed by this priority queue. - */ -public class InternalTimerHeap<K, N> implements Iterable<InternalTimer<K, N>> { - - /** - * Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. - */ - private static final Comparator<TimerHeapInternalTimer<?, ?>> COMPARATOR = - (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); - - /** - * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. - */ - private final HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>>[] deduplicationMapsByKeyGroup; - - /** - * The array that represents the heap-organized priority queue. - */ - private TimerHeapInternalTimer<K, N>[] queue; - - /** - * The current size of the priority queue. - */ - private int size; - - /** - * The key-group range of timers that are managed by this queue. - */ - private final KeyGroupRange keyGroupRange; - - /** - * The total number of key-groups of the job. - */ - private final int totalNumberOfKeyGroups; - - - /** - * Creates an empty {@link InternalTimerHeap} with the requested initial capacity. - * - * @param minimumCapacity the minimum and initial capacity of this priority queue. - */ - @SuppressWarnings("unchecked") - InternalTimerHeap( - @Nonnegative int minimumCapacity, - @Nonnull KeyGroupRange keyGroupRange, - @Nonnegative int totalNumberOfKeyGroups) { - - this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; - this.keyGroupRange = keyGroupRange; - - final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); - final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; - this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange]; - for (int i = 0; i < keyGroupsInLocalRange; ++i) { - deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize); - } - - this.queue = new TimerHeapInternalTimer[1 + minimumCapacity]; - } - - @Nullable - public InternalTimer<K, N> poll() { - return size() > 0 ? removeElementAtIndex(1) : null; - } - - @Nullable - public InternalTimer<K, N> peek() { - return size() > 0 ? queue[1] : null; - } - - /** - * Adds a new timer with the given timestamp, key, and namespace to the heap, if an identical timer was not yet - * registered. - * - * @param timestamp the timer timestamp. - * @param key the timer key. - * @param namespace the timer namespace. - * @return true iff a new timer with given timestamp, key, and namespace was added to the heap. - */ - public boolean scheduleTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { - return addInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); - } - - /** - * Stops timer with the given timestamp, key, and namespace by removing it from the heap, if it exists on the heap. - * - * @param timestamp the timer timestamp. - * @param key the timer key. - * @param namespace the timer namespace. - * @return true iff a timer with given timestamp, key, and namespace was found and removed from the heap. - */ - public boolean stopTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { - return removeInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); - } - - public boolean isEmpty() { - return size() == 0; - } - - @Nonnegative - public int size() { - return size; - } - - public void clear() { - Arrays.fill(queue, null); - for (HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>> timerHashMap : - deduplicationMapsByKeyGroup) { - timerHashMap.clear(); - } - size = 0; - } - - @SuppressWarnings({"unchecked"}) - @Nonnull - public InternalTimer<K, N>[] toArray() { - return (InternalTimer<K, N>[]) Arrays.copyOfRange(queue, 1, size + 1, queue.getClass()); - } - - /** - * Returns an iterator over the elements in this queue. The iterator - * does not return the elements in any particular order. - * - * @return an iterator over the elements in this queue. - */ - @Nonnull - public Iterator<InternalTimer<K, N>> iterator() { - return new InternalTimerPriorityQueueIterator(); - } - - /** - * This method adds all the given timers to the heap. - */ - void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> restoredTimers) { - - if (restoredTimers == null) { - return; - } - - resizeForBulkLoad(restoredTimers.size()); - - for (InternalTimer<K, N> timer : restoredTimers) { - if (timer instanceof TimerHeapInternalTimer) { - addInternal((TimerHeapInternalTimer<K, N>) timer); - } else { - scheduleTimer(timer.getTimestamp(), timer.getKey(), timer.getNamespace()); - } - } - } - - /** - * Returns an unmodifiable set of all timers in the given key-group. - */ - @Nonnull - Set<InternalTimer<K, N>> getTimersForKeyGroup(@Nonnegative int keyGroupIdx) { - return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet()); - } - - @VisibleForTesting - @SuppressWarnings("unchecked") - @Nonnull - List<Set<InternalTimer<K, N>>> getTimersByKeyGroup() { - List<Set<InternalTimer<K, N>>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length); - for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) { - result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet())); - } - return result; - } - - @Nonnull - StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> serializer) { - return new InternalTimerHeapSnapshot<>( - Arrays.copyOfRange(queue, 1, size + 1), - serializer, - keyGroupRange, - totalNumberOfKeyGroups); - } - - private boolean addInternal(TimerHeapInternalTimer<K, N> timer) { - - if (getDedupMapForTimer(timer).putIfAbsent(timer, timer) == null) { - final int newSize = increaseSizeByOne(); - moveElementToIdx(timer, newSize); - siftUp(newSize); - return true; - } else { - return false; - } - } - - private boolean removeInternal(TimerHeapInternalTimer<K, N> timerToRemove) { - - TimerHeapInternalTimer<K, N> storedTimer = getDedupMapForTimer(timerToRemove).remove(timerToRemove); - - if (storedTimer != null) { - removeElementAtIndex(storedTimer.getTimerHeapIndex()); - return true; - } - - return false; - } - - private TimerHeapInternalTimer<K, N> removeElementAtIndex(int removeIdx) { - TimerHeapInternalTimer<K, N>[] heap = this.queue; - TimerHeapInternalTimer<K, N> removedValue = heap[removeIdx]; - - assert removedValue.getTimerHeapIndex() == removeIdx; - - final int oldSize = size; - - if (removeIdx != oldSize) { - TimerHeapInternalTimer<K, N> timer = heap[oldSize]; - moveElementToIdx(timer, removeIdx); - siftDown(removeIdx); - if (heap[removeIdx] == timer) { - siftUp(removeIdx); - } - } - - heap[oldSize] = null; - getDedupMapForTimer(removedValue).remove(removedValue); - - --size; - return removedValue; - } - - private void siftUp(int idx) { - final TimerHeapInternalTimer<K, N>[] heap = this.queue; - final TimerHeapInternalTimer<K, N> currentTimer = heap[idx]; - int parentIdx = idx >>> 1; - - while (parentIdx > 0 && isTimerLessThen(currentTimer, heap[parentIdx])) { - moveElementToIdx(heap[parentIdx], idx); - idx = parentIdx; - parentIdx >>>= 1; - } - - moveElementToIdx(currentTimer, idx); - } - - private void siftDown(int idx) { - final TimerHeapInternalTimer<K, N>[] heap = this.queue; - final int heapSize = this.size; - - final TimerHeapInternalTimer<K, N> currentTimer = heap[idx]; - int firstChildIdx = idx << 1; - int secondChildIdx = firstChildIdx + 1; - - if (isTimerIndexValid(secondChildIdx, heapSize) && - isTimerLessThen(heap[secondChildIdx], heap[firstChildIdx])) { - firstChildIdx = secondChildIdx; - } - - while (isTimerIndexValid(firstChildIdx, heapSize) && - isTimerLessThen(heap[firstChildIdx], currentTimer)) { - moveElementToIdx(heap[firstChildIdx], idx); - idx = firstChildIdx; - firstChildIdx = idx << 1; - secondChildIdx = firstChildIdx + 1; - - if (isTimerIndexValid(secondChildIdx, heapSize) && - isTimerLessThen(heap[secondChildIdx], heap[firstChildIdx])) { - firstChildIdx = secondChildIdx; - } - } - - moveElementToIdx(currentTimer, idx); - } - - private HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>> getDedupMapForKeyGroup( - @Nonnegative int keyGroupIdx) { - return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)]; - } - - private boolean isTimerIndexValid(int timerIndex, int heapSize) { - return timerIndex <= heapSize; - } - - private boolean isTimerLessThen(TimerHeapInternalTimer<K, N> a, TimerHeapInternalTimer<K, N> b) { - return COMPARATOR.compare(a, b) < 0; - } - - private void moveElementToIdx(TimerHeapInternalTimer<K, N> element, int idx) { - queue[idx] = element; - element.setTimerHeapIndex(idx); - } - - private HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, N>> getDedupMapForTimer( - InternalTimer<K, N> timer) { - int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNumberOfKeyGroups); - return getDedupMapForKeyGroup(keyGroup); - } - - private int globalKeyGroupToLocalIndex(int keyGroup) { - checkArgument(keyGroupRange.contains(keyGroup)); - return keyGroup - keyGroupRange.getStartKeyGroup(); - } - - private int increaseSizeByOne() { - final int oldArraySize = queue.length; - final int minRequiredNewSize = ++size; - if (minRequiredNewSize >= oldArraySize) { - final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1; - resizeQueueArray(oldArraySize + grow, minRequiredNewSize); - } - // TODO implement shrinking as well? - return minRequiredNewSize; - } - - private void resizeForBulkLoad(int totalSize) { - if (totalSize > queue.length) { - int desiredSize = totalSize + (totalSize >>> 3); - resizeQueueArray(desiredSize, totalSize); - } - } - - private void resizeQueueArray(int desiredSize, int minRequiredSize) { - if (isValidArraySize(desiredSize)) { - queue = Arrays.copyOf(queue, desiredSize); - } else if (isValidArraySize(minRequiredSize)) { - queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE); - } else { - throw new OutOfMemoryError("Required minimum timer heap size " + minRequiredSize + - " exceeds maximum size of " + MAX_ARRAY_SIZE + "."); - } - } - - private static boolean isValidArraySize(int size) { - return size >= 0 && size <= MAX_ARRAY_SIZE; - } - - /** - * {@link Iterator} implementation for {@link InternalTimerPriorityQueueIterator}. - * {@link Iterator#remove()} is not supported. - */ - private class InternalTimerPriorityQueueIterator implements Iterator<InternalTimer<K, N>> { - - private int iterationIdx; - - InternalTimerPriorityQueueIterator() { - this.iterationIdx = 0; - } - - @Override - public boolean hasNext() { - return iterationIdx < size; - } - - @Override - public InternalTimer<K, N> next() { - if (iterationIdx >= size) { - throw new NoSuchElementException("Iterator has no next element."); - } - return queue[++iterationIdx]; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java deleted file mode 100644 index 6eb4057..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.runtime.state.KeyGroupPartitioner; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.StateSnapshot; - -import javax.annotation.Nonnegative; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * This class represents the snapshot of an {@link InternalTimerHeap}. - * - * @param <K> type of key. - * @param <N> type of namespace. - */ -public class InternalTimerHeapSnapshot<K, N> implements StateSnapshot { - - /** Copy of the heap array containing all the (immutable timers). */ - @Nonnull - private final TimerHeapInternalTimer<K, N>[] timerHeapArrayCopy; - - /** The timer serializer. */ - @Nonnull - private final TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer; - - /** The key-group range covered by this snapshot. */ - @Nonnull - private final KeyGroupRange keyGroupRange; - - /** The total number of key-groups in the job. */ - @Nonnegative - private final int totalKeyGroups; - - /** Result of partitioning the snapshot by key-group. */ - @Nullable - private KeyGroupPartitionedSnapshot partitionedSnapshot; - - InternalTimerHeapSnapshot( - @Nonnull TimerHeapInternalTimer<K, N>[] timerHeapArrayCopy, - @Nonnull TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer, - @Nonnull KeyGroupRange keyGroupRange, - @Nonnegative int totalKeyGroups) { - - this.timerHeapArrayCopy = timerHeapArrayCopy; - this.timerSerializer = timerSerializer; - this.keyGroupRange = keyGroupRange; - this.totalKeyGroups = totalKeyGroups; - } - - @SuppressWarnings("unchecked") - @Nonnull - @Override - public KeyGroupPartitionedSnapshot partitionByKeyGroup() { - - if (partitionedSnapshot == null) { - - TimerHeapInternalTimer<K, N>[] partitioningOutput = new TimerHeapInternalTimer[timerHeapArrayCopy.length]; - - KeyGroupPartitioner<TimerHeapInternalTimer<K, N>> timerPartitioner = - new KeyGroupPartitioner<>( - timerHeapArrayCopy, - timerHeapArrayCopy.length, - partitioningOutput, - keyGroupRange, - totalKeyGroups, - TimerHeapInternalTimer::getKey, - timerSerializer::serialize); - - partitionedSnapshot = timerPartitioner.partitionByKeyGroup(); - } - - return partitionedSnapshot; - } - - @Override - public void release() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java index ddaee6b..bedd4e0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java @@ -37,8 +37,8 @@ public class InternalTimersSnapshot<K, N> { private TypeSerializer<N> namespaceSerializer; private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot; - private Set<InternalTimer<K, N>> eventTimeTimers; - private Set<InternalTimer<K, N>> processingTimeTimers; + private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers; + private Set<TimerHeapInternalTimer<K, N>> processingTimeTimers; /** Empty constructor used when restoring the timers. */ public InternalTimersSnapshot() {} @@ -49,8 +49,8 @@ public class InternalTimersSnapshot<K, N> { TypeSerializerConfigSnapshot keySerializerConfigSnapshot, TypeSerializer<N> namespaceSerializer, TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot, - @Nullable Set<InternalTimer<K, N>> eventTimeTimers, - @Nullable Set<InternalTimer<K, N>> processingTimeTimers) { + @Nullable Set<TimerHeapInternalTimer<K, N>> eventTimeTimers, + @Nullable Set<TimerHeapInternalTimer<K, N>> processingTimeTimers) { this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot); @@ -92,19 +92,19 @@ public class InternalTimersSnapshot<K, N> { this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot; } - public Set<InternalTimer<K, N>> getEventTimeTimers() { + public Set<TimerHeapInternalTimer<K, N>> getEventTimeTimers() { return eventTimeTimers; } - public void setEventTimeTimers(Set<InternalTimer<K, N>> eventTimeTimers) { + public void setEventTimeTimers(Set<TimerHeapInternalTimer<K, N>> eventTimeTimers) { this.eventTimeTimers = eventTimeTimers; } - public Set<InternalTimer<K, N>> getProcessingTimeTimers() { + public Set<TimerHeapInternalTimer<K, N>> getProcessingTimeTimers() { return processingTimeTimers; } - public void setProcessingTimeTimers(Set<InternalTimer<K, N>> processingTimeTimers) { + public void setProcessingTimeTimers(Set<TimerHeapInternalTimer<K, N>> processingTimeTimers) { this.processingTimeTimers = processingTimeTimers; } http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java index 05b77a7..dbb74e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java @@ -101,10 +101,10 @@ public class InternalTimersSnapshotReaderWriters { timersSnapshot.getNamespaceSerializer()); // write the event time timers - Set<InternalTimer<K, N>> eventTimers = timersSnapshot.getEventTimeTimers(); + Set<TimerHeapInternalTimer<K, N>> eventTimers = timersSnapshot.getEventTimeTimers(); if (eventTimers != null) { out.writeInt(eventTimers.size()); - for (InternalTimer<K, N> eventTimer : eventTimers) { + for (TimerHeapInternalTimer<K, N> eventTimer : eventTimers) { timerSerializer.serialize(eventTimer, out); } } else { @@ -112,10 +112,10 @@ public class InternalTimersSnapshotReaderWriters { } // write the processing time timers - Set<InternalTimer<K, N>> processingTimers = timersSnapshot.getProcessingTimeTimers(); + Set<TimerHeapInternalTimer<K, N>> processingTimers = timersSnapshot.getProcessingTimeTimers(); if (processingTimers != null) { out.writeInt(processingTimers.size()); - for (InternalTimer<K, N> processingTimer : processingTimers) { + for (TimerHeapInternalTimer<K, N> processingTimer : processingTimers) { timerSerializer.serialize(processingTimer, out); } } else { @@ -222,10 +222,10 @@ public class InternalTimersSnapshotReaderWriters { // read the event time timers int sizeOfEventTimeTimers = in.readInt(); - Set<InternalTimer<K, N>> restoredEventTimers = new HashSet<>(sizeOfEventTimeTimers); + Set<TimerHeapInternalTimer<K, N>> restoredEventTimers = new HashSet<>(sizeOfEventTimeTimers); if (sizeOfEventTimeTimers > 0) { for (int i = 0; i < sizeOfEventTimeTimers; i++) { - InternalTimer<K, N> timer = timerSerializer.deserialize(in); + TimerHeapInternalTimer<K, N> timer = timerSerializer.deserialize(in); restoredEventTimers.add(timer); } } @@ -233,10 +233,10 @@ public class InternalTimersSnapshotReaderWriters { // read the processing time timers int sizeOfProcessingTimeTimers = in.readInt(); - Set<InternalTimer<K, N>> restoredProcessingTimers = new HashSet<>(sizeOfProcessingTimeTimers); + Set<TimerHeapInternalTimer<K, N>> restoredProcessingTimers = new HashSet<>(sizeOfProcessingTimeTimers); if (sizeOfProcessingTimeTimers > 0) { for (int i = 0; i < sizeOfProcessingTimeTimers; i++) { - InternalTimer<K, N> timer = timerSerializer.deserialize(in); + TimerHeapInternalTimer<K, N> timer = timerSerializer.deserialize(in); restoredProcessingTimers.add(timer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java index 906b090..bd821c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java @@ -19,35 +19,48 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.Comparator; /** - * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * Implementation of {@link InternalTimer} to use with a {@link HeapPriorityQueueSet}. * * @param <K> Type of the keys to which timers are scoped. * @param <N> Type of the namespace to which timers are scoped. */ @Internal -public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { +public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement { - /** The index that indicates that a tracked internal timer is not tracked. */ - private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + /** Function to extract the key from a {@link TimerHeapInternalTimer}. */ + private static final KeyExtractorFunction<TimerHeapInternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = + TimerHeapInternalTimer::getKey; + /** Function to compare instances of {@link TimerHeapInternalTimer}. */ + private static final Comparator<TimerHeapInternalTimer<?, ?>> TIMER_COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** The key for which the timer is scoped. */ @Nonnull private final K key; + /** The namespace for which the timer is scoped. */ @Nonnull private final N namespace; + /** The expiration timestamp. */ private final long timestamp; /** @@ -60,7 +73,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { this.timestamp = timestamp; this.key = key; this.namespace = namespace; - this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX; + this.timerHeapIndex = NOT_CONTAINED; } @Override @@ -96,19 +109,14 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { return false; } - /** - * Returns the current index of this timer in the owning timer heap. - */ - int getTimerHeapIndex() { + @Override + public int getInternalIndex() { return timerHeapIndex; } - /** - * Sets the current index of this timer in the owning timer heap and should only be called by the managing heap. - * @param timerHeapIndex the new index in the timer heap. - */ - void setTimerHeapIndex(int timerHeapIndex) { - this.timerHeapIndex = timerHeapIndex; + @Override + public void setInternalIndex(int newIndex) { + this.timerHeapIndex = newIndex; } /** @@ -116,7 +124,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { * removed. */ void removedFromTimerQueue() { - setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX); + setInternalIndex(NOT_CONTAINED); } @Override @@ -136,10 +144,22 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { '}'; } + @VisibleForTesting + @SuppressWarnings("unchecked") + static <T extends TimerHeapInternalTimer> Comparator<T> getTimerComparator() { + return (Comparator<T>) TIMER_COMPARATOR; + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + static <T extends TimerHeapInternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() { + return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION; + } + /** * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}. */ - public static class TimerSerializer<K, N> extends TypeSerializer<InternalTimer<K, N>> { + public static class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> { private static final long serialVersionUID = 1119562170939152304L; @@ -160,7 +180,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { } @Override - public TypeSerializer<InternalTimer<K, N>> duplicate() { + public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() { final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate(); final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate(); @@ -176,17 +196,17 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { } @Override - public InternalTimer<K, N> createInstance() { + public TimerHeapInternalTimer<K, N> createInstance() { throw new UnsupportedOperationException(); } @Override - public InternalTimer<K, N> copy(InternalTimer<K, N> from) { + public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) { return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace()); } @Override - public InternalTimer<K, N> copy(InternalTimer<K, N> from, InternalTimer<K, N> reuse) { + public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) { return copy(from); } @@ -197,14 +217,14 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { } @Override - public void serialize(InternalTimer<K, N> record, DataOutputView target) throws IOException { + public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException { keySerializer.serialize(record.getKey(), target); namespaceSerializer.serialize(record.getNamespace(), target); LongSerializer.INSTANCE.serialize(record.getTimestamp(), target); } @Override - public InternalTimer<K, N> deserialize(DataInputView source) throws IOException { + public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException { K key = keySerializer.deserialize(source); N namespace = namespaceSerializer.deserialize(source); Long timestamp = LongSerializer.INSTANCE.deserialize(source); @@ -212,7 +232,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { } @Override - public InternalTimer<K, N> deserialize(InternalTimer<K, N> reuse, DataInputView source) throws IOException { + public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException { return deserialize(source); } @@ -247,7 +267,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { } @Override - public CompatibilityResult<InternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { throw new UnsupportedOperationException("This serializer is not registered for managed state."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index 1ce400d..b008fa2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -104,7 +104,7 @@ public class HeapInternalTimerServiceTest { int endKeyGroupIdx = totalNoOfKeyGroups - 1; // we have 0 to 99 @SuppressWarnings("unchecked") - Set<InternalTimer<Integer, String>>[] expectedNonEmptyTimerSets = new HashSet[totalNoOfKeyGroups]; + Set<TimerHeapInternalTimer<Integer, String>>[] expectedNonEmptyTimerSets = new HashSet[totalNoOfKeyGroups]; TestKeyContext keyContext = new TestKeyContext(); HeapInternalTimerService<Integer, String> timerService = @@ -123,7 +123,7 @@ public class HeapInternalTimerServiceTest { int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNoOfKeyGroups); // add it in the adequate expected set of timers per keygroup - Set<InternalTimer<Integer, String>> timerSet = expectedNonEmptyTimerSets[keyGroupIdx]; + Set<TimerHeapInternalTimer<Integer, String>> timerSet = expectedNonEmptyTimerSets[keyGroupIdx]; if (timerSet == null) { timerSet = new HashSet<>(); expectedNonEmptyTimerSets[keyGroupIdx] = timerSet; @@ -136,16 +136,16 @@ public class HeapInternalTimerServiceTest { timerService.registerProcessingTimeTimer(timer.getNamespace(), timer.getTimestamp()); } - List<Set<InternalTimer<Integer, String>>> eventTimeTimers = + List<Set<TimerHeapInternalTimer<Integer, String>>> eventTimeTimers = timerService.getEventTimeTimersPerKeyGroup(); - List<Set<InternalTimer<Integer, String>>> processingTimeTimers = + List<Set<TimerHeapInternalTimer<Integer, String>>> processingTimeTimers = timerService.getProcessingTimeTimersPerKeyGroup(); // finally verify that the actual timers per key group sets are the expected ones. for (int i = 0; i < expectedNonEmptyTimerSets.length; i++) { - Set<InternalTimer<Integer, String>> expected = expectedNonEmptyTimerSets[i]; - Set<InternalTimer<Integer, String>> actualEvent = eventTimeTimers.get(i); - Set<InternalTimer<Integer, String>> actualProcessing = processingTimeTimers.get(i); + Set<TimerHeapInternalTimer<Integer, String>> expected = expectedNonEmptyTimerSets[i]; + Set<TimerHeapInternalTimer<Integer, String>> actualEvent = eventTimeTimers.get(i); + Set<TimerHeapInternalTimer<Integer, String>> actualProcessing = processingTimeTimers.get(i); if (expected == null) { Assert.assertTrue(actualEvent.isEmpty());