This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8ddbef9a5cc5dc769ba61bd5019dd96843c932f Author: Yun Tang <myas...@live.com> AuthorDate: Fri Jun 5 01:16:27 2020 +0800 [FLINK-17800][roksdb] Ensure total order seek to avoid user misuse --- .../state/RocksDBCachingPriorityQueueSet.java | 8 +- .../state/RocksDBIncrementalCheckpointUtils.java | 4 +- .../streaming/state/RocksDBKeyedStateBackend.java | 23 +++- .../state/RocksDBKeyedStateBackendBuilder.java | 17 ++- .../contrib/streaming/state/RocksDBMapState.java | 6 +- .../streaming/state/RocksDBOperationUtils.java | 15 ++- .../state/RocksDBPriorityQueueSetFactory.java | 9 +- .../RocksDBIncrementalRestoreOperation.java | 7 +- ...rtitionedPriorityQueueWithRocksDBStoreTest.java | 1 + .../contrib/streaming/state/RocksDBResource.java | 4 +- .../state/RocksDBRocksStateKeysIteratorTest.java | 2 +- .../state/RocksDBStateBackendConfigTest.java | 28 +--- .../state/RocksDBStateMisuseOptionTest.java | 147 +++++++++++++++++++++ .../contrib/streaming/state/RocksDBTestUtils.java | 21 +++ ...RocksKeyGroupsRocksSingleStateIteratorTest.java | 6 +- .../state/benchmark/RocksDBPerformanceTest.java | 3 +- .../api/operators/TimerHeapInternalTimer.java | 2 +- 17 files changed, 256 insertions(+), 47 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java index 364185a..fb9a833 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java @@ -29,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnull private final RocksDB db; + @Nonnull + private final ReadOptions readOptions; + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ @Nonnull private final ColumnFamilyHandle columnFamilyHandle; @@ -112,6 +116,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnegative int keyGroupId, @Nonnegative int keyGroupPrefixBytes, @Nonnull RocksDB db, + @Nonnull ReadOptions readOptions, @Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull TypeSerializer<E> byteOrderProducingSerializer, @Nonnull DataOutputSerializer outputStream, @@ -119,6 +124,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnull RocksDBWriteBatchWrapper batchWrapper, @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) { this.db = db; + this.readOptions = readOptions; this.columnFamilyHandle = columnFamilyHandle; this.byteOrderProducingSerializer = byteOrderProducingSerializer; this.batchWrapper = batchWrapper; @@ -304,7 +310,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> flushWriteBatch(); return new RocksBytesIterator( new RocksIteratorWrapper( - db.newIterator(columnFamilyHandle))); + db.newIterator(columnFamilyHandle, readOptions))); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 1f43dd0..5bce695 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -116,7 +117,8 @@ public class RocksDBIncrementalCheckpointUtils { @Nonnegative long writeBatchSize) throws RocksDBException { for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle); + try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); + RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions); RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) { iteratorWrapper.seek(beginKeyBytes); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 2ddb79b..505bd2a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -66,6 +66,7 @@ import org.apache.flink.util.StateMigrationException; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; @@ -150,6 +151,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final WriteOptions writeOptions; /** + * The read options to use when creating iterators. + * We ensure total order seek in case user misuse, see FLINK-17800 for more details. + */ + private final ReadOptions readOptions; + + /** * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}. */ private final long writeBatchSize; @@ -212,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, RocksDB db, + WriteOptions writeOptions, + ReadOptions readOptions, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, @@ -250,7 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.kvStateInformation = kvStateInformation; - this.writeOptions = new WriteOptions().setDisableWAL(true); + this.writeOptions = writeOptions; + this.readOptions = readOptions; checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); this.writeBatchSize = writeBatchSize; this.db = db; @@ -290,7 +300,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); } - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions); iterator.seekToFirst(); final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes, @@ -360,6 +370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyOptions.forEach(IOUtils::closeQuietly); IOUtils.closeQuietly(optionsContainer); + IOUtils.closeQuietly(readOptions); IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); @@ -407,6 +418,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return writeOptions; } + public ReadOptions getReadOptions() { + return readOptions; + } + RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() { return sharedRocksKeyBuilder; } @@ -599,7 +614,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Snapshot rocksDBSnapshot = db.getSnapshot(); try ( - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions); RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize()) ) { iterator.seekToFirst(); @@ -674,7 +689,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) { //TODO maybe filterOrTransform only for k/v states - try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) { + try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 70c438e..08b4864 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -54,6 +54,7 @@ import org.apache.flink.util.ResourceGuard; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -250,6 +251,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); // The write options to use in the states. WriteOptions writeOptions = null; + ReadOptions readOptions = null; LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); RocksDB db = null; AbstractRocksDBRestoreOperation restoreOperation = null; @@ -288,6 +290,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } writeOptions = new WriteOptions().setDisableWAL(true); + readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize); // it is important that we only create the key builder after the restore, and not before; // restore operations may reconfigure the key serializer, so accessing the key serializer @@ -300,8 +303,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard, kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId); // init priority queue factory - priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, - writeBatchWrapper, nativeMetricMonitor); + priorityQueueFactory = initPriorityQueueFactory( + keyGroupPrefixBytes, + kvStateInformation, + db, + readOptions, + writeBatchWrapper, + nativeMetricMonitor); } catch (Throwable e) { // Do clean up List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size()); @@ -319,6 +327,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken IOUtils.closeQuietly(restoreOperation); IOUtils.closeAllQuietly(columnFamilyOptions); IOUtils.closeQuietly(optionsContainer); + IOUtils.closeQuietly(readOptions); IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); kvStateInformation.clear(); @@ -350,6 +359,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.executionConfig, this.ttlTimeProvider, db, + writeOptions, + readOptions, kvStateInformation, keyGroupPrefixBytes, cancelStreamRegistryForBackend, @@ -478,6 +489,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, + ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) { PriorityQueueSetFactory priorityQueueFactory; @@ -492,6 +504,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken numberOfKeyGroups, kvStateInformation, db, + readOptions, writeBatchWrapper, nativeMetricMonitor, columnFamilyOptionsFactory diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index ad9281c..7ba3edd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -243,7 +243,7 @@ class RocksDBMapState<K, N, UK, UV> public boolean isEmpty() { final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace(); - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) { iterator.seek(prefixBytes); @@ -254,7 +254,7 @@ class RocksDBMapState<K, N, UK, UV> @Override public void clear() { try { - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily); + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions()); RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) { final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace(); @@ -577,7 +577,7 @@ class RocksDBMapState<K, N, UK, UV> // use try-with-resources to ensure RocksIterator can be release even some runtime exception // occurred in the below code block. - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) { /* * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 1455c1b..0f564d5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -32,6 +32,7 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -94,12 +95,18 @@ public class RocksDBOperationUtils { return dbRef; } - public static RocksIteratorWrapper getRocksIterator(RocksDB db) { - return new RocksIteratorWrapper(db.newIterator()); + public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) { + return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions)); } - public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) { - return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); + /** + * Create a total order read option to avoid user misuse, see FLINK-17800 for more details. + * + * <p>Note, remember to close the generated {@link ReadOptions} when dispose. + */ + // TODO We would remove this method once we bump RocksDB version larger than 6.2.2. + public static ReadOptions createTotalOrderSeekReadOptions() { + return new ReadOptions().setTotalOrderSeek(true); } public static void registerKvStateInformation( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 1cbeebe..59daff8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.core.memory.DataInputDeserializer; @@ -36,6 +37,7 @@ import org.apache.flink.util.StateMigrationException; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import javax.annotation.Nonnull; @@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { /** * Default cache size per key-group. */ - private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable + @VisibleForTesting + static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable /** * A shared buffer to serialize elements for the priority queue. @@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { private final int numberOfKeyGroups; private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation; private final RocksDB db; + private final ReadOptions readOptions; private final RocksDBWriteBatchWrapper writeBatchWrapper; private final RocksDBNativeMetricMonitor nativeMetricMonitor; private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; @@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { int numberOfKeyGroups, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, + ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) { @@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { this.numberOfKeyGroups = numberOfKeyGroups; this.kvStateInformation = kvStateInformation; this.db = db; + this.readOptions = readOptions; this.writeBatchWrapper = writeBatchWrapper; this.nativeMetricMonitor = nativeMetricMonitor; this.columnFamilyOptionsFactory = columnFamilyOptionsFactory; @@ -122,6 +128,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { keyGroupId, keyGroupPrefixBytes, db, + readOptions, columnFamilyHandle, byteOrderedElementSerializer, sharedElementOutView, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index f7abed6..53c7537 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -51,6 +51,7 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -309,7 +310,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)) .columnFamilyHandle; - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) { iterator.seek(startKeyGroupPrefixBytes); @@ -376,6 +377,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + private final ReadOptions readOptions; + private RestoredDBInstance( @Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @@ -386,6 +389,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor this.columnFamilyHandles = columnFamilyHandles; this.columnFamilyDescriptors = columnFamilyDescriptors; this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); } @Override @@ -397,6 +401,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor IOUtils.closeAllQuietly(columnFamilyHandles); IOUtils.closeQuietly(db); IOUtils.closeAllQuietly(columnFamilyOptions); + IOUtils.closeQuietly(readOptions); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java index d402c3d..fd68d70 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java @@ -60,6 +60,7 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern keyGroupId, keyGroupPrefixBytes, rocksDBResource.getRocksDB(), + rocksDBResource.getReadOptions(), rocksDBResource.getDefaultColumnFamily(), TestElementSerializer.INSTANCE, outputStreamWithPos, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java index d25baa7..3b3b697 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java @@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource { LOG.error("Close previous ColumnOptions's instance failed.", e); } - return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose); + return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960); } }); } @@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource { PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose); this.writeOptions = new WriteOptions(); this.writeOptions.disableWAL(); - this.readOptions = new ReadOptions(); + this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); this.columnFamilyHandles = new ArrayList<>(1); this.rocksDB = RocksDB.open( dbOptions, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java index 8f71b65..1a4808f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java @@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest { ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName); try ( - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions()); RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>( iterator, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 2167a00..cf1fdfa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -59,6 +59,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -121,7 +122,7 @@ public class RocksDBStateBackendConfigTest { assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths()); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); - final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -158,7 +159,7 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); - RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); + RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); keyedBackend.dispose(); @@ -168,7 +169,7 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend.PriorityQueueStateType.HEAP.toString()); rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader()); - keyedBackend = createKeyedStateBackend(rocksDbBackend, env); + keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); Assert.assertEquals( HeapPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); @@ -197,7 +198,7 @@ public class RocksDBStateBackendConfigTest { final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure( configFromConfFile, Thread.currentThread().getContextClassLoader()); - final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE); // priorityQueueStateType of the job backend should be preserved assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class)); @@ -254,7 +255,7 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend.setDbStoragePath(configuredPath); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); - RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); + RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -725,23 +726,6 @@ public class RocksDBStateBackendConfigTest { // Utilities // ------------------------------------------------------------------------ - static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend( - RocksDBStateBackend rocksDbBackend, Environment env) throws Exception { - - return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry(), - TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), - Collections.emptyList(), - new CloseableRegistry()); - } - static MockEnvironment getMockEnvironment(File... tempDirs) { final String[] tempDirStrings = new String[tempDirs.length]; for (int i = 0; i < tempDirs.length; i++) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java new file mode 100644 index 0000000..59a4822 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; +import org.apache.flink.streaming.api.operators.TimerSerializer; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback. + * + * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users. + * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations + * could not get expected result, e.g. FLINK-17800 + */ +public class RocksDBStateMisuseOptionTest { + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state. + * + * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid. + */ + @Test + public void testMisuseOptimizePointLookupWithMapState() throws Exception { + RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); + RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); + MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE); + MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); + + keyedStateBackend.setCurrentKey(1); + Map<Integer, Long> expectedResult = new HashMap<>(); + for (int i = 0; i < 100; i++) { + long uv = ThreadLocalRandom.current().nextLong(); + mapState.put(i, uv); + expectedResult.put(i, uv); + } + + Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, Long> entry = iterator.next(); + assertEquals(entry.getValue(), expectedResult.remove(entry.getKey())); + iterator.remove(); + } + assertTrue(expectedResult.isEmpty()); + assertTrue(mapState.isEmpty()); + } + + /** + * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue. + * + * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid. + */ + @Test + public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException { + RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); + RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue = + keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE)); + + PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp())); + // ensure we insert timers more than cache capacity. + int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42; + List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList()); + Collections.shuffle(timeStamps); + for (Integer timeStamp : timeStamps) { + TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE); + priorityQueue.add(timer); + expectedPriorityQueue.add(timer); + } + assertEquals(queueSize, priorityQueue.size()); + TimerHeapInternalTimer<Integer, VoidNamespace> timer; + while ((timer = priorityQueue.poll()) != null) { + assertEquals(expectedPriorityQueue.poll(), timer); + } + assertTrue(expectedPriorityQueue.isEmpty()); + assertTrue(priorityQueue.isEmpty()); + + } + + private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException { + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true); + rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB); + rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() { + + private static final long serialVersionUID = 1L; + + @Override + public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + return currentOptions.optimizeForPointLookup(64); + } + }); + return rocksDBStateBackend; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index ed44a73..cee56aa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; @@ -35,6 +36,7 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.RocksDB; import java.io.File; +import java.io.IOException; import java.util.Collections; /** @@ -98,4 +100,23 @@ public final class RocksDBTestUtils { defaultCFHandle, new CloseableRegistry()); } + + public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend( + RocksDBStateBackend rocksDbBackend, + Environment env, + TypeSerializer<K> keySerializer) throws IOException { + + return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + keySerializer, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java index 4447f08..2c73fc7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import java.io.DataOutputStream; @@ -74,7 +75,8 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest { public void testMergeIterator(int maxParallelism) throws Exception { Random random = new Random(1234); - try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { + try (ReadOptions readOptions = new ReadOptions(); + RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>(); @@ -108,7 +110,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest { int id = 0; for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) { - rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id)); + rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id)); ++id; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index 96003d6..ce06d1f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state.benchmark; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; -import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils; import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; import org.apache.flink.core.memory.MemoryUtils; import org.apache.flink.testutils.junit.RetryOnFailure; @@ -167,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger { int pos = 0; - try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) { + try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) { // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java index a6194ed..e02901b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java @@ -50,7 +50,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, */ private transient int timerHeapIndex; - TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { + public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { this.timestamp = timestamp; this.key = key; this.namespace = namespace;