This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 831daaae2c0cdef6871001e856dfd6a54da2a943 Author: Yu Li <l...@apache.org> AuthorDate: Sat Jun 20 16:59:06 2020 +0800 Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse" This reverts commit b8ddbef9a5cc5dc769ba61bd5019dd96843c932f. --- .../state/RocksDBCachingPriorityQueueSet.java | 8 +- .../state/RocksDBIncrementalCheckpointUtils.java | 4 +- .../streaming/state/RocksDBKeyedStateBackend.java | 23 +--- .../state/RocksDBKeyedStateBackendBuilder.java | 17 +-- .../contrib/streaming/state/RocksDBMapState.java | 6 +- .../streaming/state/RocksDBOperationUtils.java | 15 +-- .../state/RocksDBPriorityQueueSetFactory.java | 9 +- .../RocksDBIncrementalRestoreOperation.java | 7 +- ...rtitionedPriorityQueueWithRocksDBStoreTest.java | 1 - .../contrib/streaming/state/RocksDBResource.java | 4 +- .../state/RocksDBRocksStateKeysIteratorTest.java | 2 +- .../state/RocksDBStateBackendConfigTest.java | 28 +++- .../state/RocksDBStateMisuseOptionTest.java | 147 --------------------- .../contrib/streaming/state/RocksDBTestUtils.java | 21 --- ...RocksKeyGroupsRocksSingleStateIteratorTest.java | 6 +- .../state/benchmark/RocksDBPerformanceTest.java | 3 +- .../api/operators/TimerHeapInternalTimer.java | 2 +- 17 files changed, 47 insertions(+), 256 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java index fb9a833..364185a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java @@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -64,9 +63,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnull private final RocksDB db; - @Nonnull - private final ReadOptions readOptions; - /** Handle to the column family of the RocksDB instance in which the elements are stored. */ @Nonnull private final ColumnFamilyHandle columnFamilyHandle; @@ -116,7 +112,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnegative int keyGroupId, @Nonnegative int keyGroupPrefixBytes, @Nonnull RocksDB db, - @Nonnull ReadOptions readOptions, @Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull TypeSerializer<E> byteOrderProducingSerializer, @Nonnull DataOutputSerializer outputStream, @@ -124,7 +119,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> @Nonnull RocksDBWriteBatchWrapper batchWrapper, @Nonnull OrderedByteArraySetCache orderedByteArraySetCache) { this.db = db; - this.readOptions = readOptions; this.columnFamilyHandle = columnFamilyHandle; this.byteOrderProducingSerializer = byteOrderProducingSerializer; this.batchWrapper = batchWrapper; @@ -310,7 +304,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement> flushWriteBatch(); return new RocksBytesIterator( new RocksIteratorWrapper( - db.newIterator(columnFamilyHandle, readOptions))); + db.newIterator(columnFamilyHandle))); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 5bce695..1f43dd0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -21,7 +21,6 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -117,8 +116,7 @@ public class RocksDBIncrementalCheckpointUtils { @Nonnegative long writeBatchSize) throws RocksDBException { for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); - RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions); + try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle); RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) { iteratorWrapper.seek(beginKeyBytes); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 505bd2a..2ddb79b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -66,7 +66,6 @@ import org.apache.flink.util.StateMigrationException; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; @@ -151,12 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final WriteOptions writeOptions; /** - * The read options to use when creating iterators. - * We ensure total order seek in case user misuse, see FLINK-17800 for more details. - */ - private final ReadOptions readOptions; - - /** * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}. */ private final long writeBatchSize; @@ -219,8 +212,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, RocksDB db, - WriteOptions writeOptions, - ReadOptions readOptions, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, @@ -259,8 +250,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.kvStateInformation = kvStateInformation; - this.writeOptions = writeOptions; - this.readOptions = readOptions; + this.writeOptions = new WriteOptions().setDisableWAL(true); checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value."); this.writeBatchSize = writeBatchSize; this.db = db; @@ -300,7 +290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); } - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle); iterator.seekToFirst(); final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes, @@ -370,7 +360,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { columnFamilyOptions.forEach(IOUtils::closeQuietly); IOUtils.closeQuietly(optionsContainer); - IOUtils.closeQuietly(readOptions); IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); @@ -418,10 +407,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return writeOptions; } - public ReadOptions getReadOptions() { - return readOptions; - } - RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() { return sharedRocksKeyBuilder; } @@ -614,7 +599,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Snapshot rocksDBSnapshot = db.getSnapshot(); try ( - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0); RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize()) ) { iterator.seekToFirst(); @@ -689,7 +674,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) { //TODO maybe filterOrTransform only for k/v states - try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) { + try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 08b4864..70c438e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -54,7 +54,6 @@ import org.apache.flink.util.ResourceGuard; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -251,7 +250,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); // The write options to use in the states. WriteOptions writeOptions = null; - ReadOptions readOptions = null; LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); RocksDB db = null; AbstractRocksDBRestoreOperation restoreOperation = null; @@ -290,7 +288,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } writeOptions = new WriteOptions().setDisableWAL(true); - readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize); // it is important that we only create the key builder after the restore, and not before; // restore operations may reconfigure the key serializer, so accessing the key serializer @@ -303,13 +300,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard, kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId); // init priority queue factory - priorityQueueFactory = initPriorityQueueFactory( - keyGroupPrefixBytes, - kvStateInformation, - db, - readOptions, - writeBatchWrapper, - nativeMetricMonitor); + priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, + writeBatchWrapper, nativeMetricMonitor); } catch (Throwable e) { // Do clean up List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size()); @@ -327,7 +319,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken IOUtils.closeQuietly(restoreOperation); IOUtils.closeAllQuietly(columnFamilyOptions); IOUtils.closeQuietly(optionsContainer); - IOUtils.closeQuietly(readOptions); IOUtils.closeQuietly(writeOptions); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); kvStateInformation.clear(); @@ -359,8 +350,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.executionConfig, this.ttlTimeProvider, db, - writeOptions, - readOptions, kvStateInformation, keyGroupPrefixBytes, cancelStreamRegistryForBackend, @@ -489,7 +478,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, - ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) { PriorityQueueSetFactory priorityQueueFactory; @@ -504,7 +492,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken numberOfKeyGroups, kvStateInformation, db, - readOptions, writeBatchWrapper, nativeMetricMonitor, columnFamilyOptionsFactory diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 7ba3edd..ad9281c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -243,7 +243,7 @@ class RocksDBMapState<K, N, UK, UV> public boolean isEmpty() { final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace(); - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) { iterator.seek(prefixBytes); @@ -254,7 +254,7 @@ class RocksDBMapState<K, N, UK, UV> @Override public void clear() { try { - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions()); + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily); RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) { final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace(); @@ -577,7 +577,7 @@ class RocksDBMapState<K, N, UK, UV> // use try-with-resources to ensure RocksIterator can be release even some runtime exception // occurred in the below code block. - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) { /* * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 0f564d5..1455c1b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -32,7 +32,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -95,18 +94,12 @@ public class RocksDBOperationUtils { return dbRef; } - public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) { - return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions)); + public static RocksIteratorWrapper getRocksIterator(RocksDB db) { + return new RocksIteratorWrapper(db.newIterator()); } - /** - * Create a total order read option to avoid user misuse, see FLINK-17800 for more details. - * - * <p>Note, remember to close the generated {@link ReadOptions} when dispose. - */ - // TODO We would remove this method once we bump RocksDB version larger than 6.2.2. - public static ReadOptions createTotalOrderSeekReadOptions() { - return new ReadOptions().setTotalOrderSeek(true); + public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) { + return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); } public static void registerKvStateInformation( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 59daff8..1cbeebe 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -17,7 +17,6 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.core.memory.DataInputDeserializer; @@ -37,7 +36,6 @@ import org.apache.flink.util.StateMigrationException; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import javax.annotation.Nonnull; @@ -54,8 +52,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { /** * Default cache size per key-group. */ - @VisibleForTesting - static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable + private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable /** * A shared buffer to serialize elements for the priority queue. @@ -74,7 +71,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { private final int numberOfKeyGroups; private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation; private final RocksDB db; - private final ReadOptions readOptions; private final RocksDBWriteBatchWrapper writeBatchWrapper; private final RocksDBNativeMetricMonitor nativeMetricMonitor; private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; @@ -85,7 +81,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { int numberOfKeyGroups, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, - ReadOptions readOptions, RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) { @@ -94,7 +89,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { this.numberOfKeyGroups = numberOfKeyGroups; this.kvStateInformation = kvStateInformation; this.db = db; - this.readOptions = readOptions; this.writeBatchWrapper = writeBatchWrapper; this.nativeMetricMonitor = nativeMetricMonitor; this.columnFamilyOptionsFactory = columnFamilyOptionsFactory; @@ -128,7 +122,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { keyGroupId, keyGroupPrefixBytes, db, - readOptions, columnFamilyHandle, byteOrderedElementSerializer, sharedElementOutView, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 53c7537..f7abed6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -51,7 +51,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -310,7 +309,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)) .columnFamilyHandle; - try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) { + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { iterator.seek(startKeyGroupPrefixBytes); @@ -377,8 +376,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - private final ReadOptions readOptions; - private RestoredDBInstance( @Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @@ -389,7 +386,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor this.columnFamilyHandles = columnFamilyHandles; this.columnFamilyDescriptors = columnFamilyDescriptors; this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); } @Override @@ -401,7 +397,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor IOUtils.closeAllQuietly(columnFamilyHandles); IOUtils.closeQuietly(db); IOUtils.closeAllQuietly(columnFamilyOptions); - IOUtils.closeQuietly(readOptions); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java index fd68d70..d402c3d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java @@ -60,7 +60,6 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern keyGroupId, keyGroupPrefixBytes, rocksDBResource.getRocksDB(), - rocksDBResource.getReadOptions(), rocksDBResource.getDefaultColumnFamily(), TestElementSerializer.INSTANCE, outputStreamWithPos, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java index 3b3b697..d25baa7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java @@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource { LOG.error("Close previous ColumnOptions's instance failed.", e); } - return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960); + return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose); } }); } @@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource { PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose); this.writeOptions = new WriteOptions(); this.writeOptions.disableWAL(); - this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions(); + this.readOptions = new ReadOptions(); this.columnFamilyHandles = new ArrayList<>(1); this.rocksDB = RocksDB.open( dbOptions, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java index 1a4808f..8f71b65 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java @@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest { ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName); try ( - RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions()); + RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle); RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>( iterator, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index cf1fdfa..2167a00 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -59,7 +59,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -122,7 +121,7 @@ public class RocksDBStateBackendConfigTest { assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths()); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); - final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -159,7 +158,7 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); - RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); keyedBackend.dispose(); @@ -169,7 +168,7 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend.PriorityQueueStateType.HEAP.toString()); rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader()); - keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + keyedBackend = createKeyedStateBackend(rocksDbBackend, env); Assert.assertEquals( HeapPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass()); @@ -198,7 +197,7 @@ public class RocksDBStateBackendConfigTest { final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure( configFromConfFile, Thread.currentThread().getContextClassLoader()); - final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE); + final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env); // priorityQueueStateType of the job backend should be preserved assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class)); @@ -255,7 +254,7 @@ public class RocksDBStateBackendConfigTest { rocksDbBackend.setDbStoragePath(configuredPath); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); - RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -726,6 +725,23 @@ public class RocksDBStateBackendConfigTest { // Utilities // ------------------------------------------------------------------------ + static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend( + RocksDBStateBackend rocksDbBackend, Environment env) throws Exception { + + return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); + } + static MockEnvironment getMockEnvironment(File... tempDirs) { final String[] tempDirStrings = new String[tempDirs.length]; for (int i = 0; i < tempDirs.length; i++) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java deleted file mode 100644 index 59a4822..0000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; -import org.apache.flink.streaming.api.operators.TimerSerializer; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback. - * - * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users. - * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations - * could not get expected result, e.g. FLINK-17800 - */ -public class RocksDBStateMisuseOptionTest { - - @Rule - public final TemporaryFolder tempFolder = new TemporaryFolder(); - - /** - * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state. - * - * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid. - */ - @Test - public void testMisuseOptimizePointLookupWithMapState() throws Exception { - RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); - RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); - MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE); - MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); - - keyedStateBackend.setCurrentKey(1); - Map<Integer, Long> expectedResult = new HashMap<>(); - for (int i = 0; i < 100; i++) { - long uv = ThreadLocalRandom.current().nextLong(); - mapState.put(i, uv); - expectedResult.put(i, uv); - } - - Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator(); - while (iterator.hasNext()) { - Map.Entry<Integer, Long> entry = iterator.next(); - assertEquals(entry.getValue(), expectedResult.remove(entry.getKey())); - iterator.remove(); - } - assertTrue(expectedResult.isEmpty()); - assertTrue(mapState.isEmpty()); - } - - /** - * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue. - * - * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid. - */ - @Test - public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException { - RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); - RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); - KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue = - keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE)); - - PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp())); - // ensure we insert timers more than cache capacity. - int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42; - List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList()); - Collections.shuffle(timeStamps); - for (Integer timeStamp : timeStamps) { - TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE); - priorityQueue.add(timer); - expectedPriorityQueue.add(timer); - } - assertEquals(queueSize, priorityQueue.size()); - TimerHeapInternalTimer<Integer, VoidNamespace> timer; - while ((timer = priorityQueue.poll()) != null) { - assertEquals(expectedPriorityQueue.poll(), timer); - } - assertTrue(expectedPriorityQueue.isEmpty()); - assertTrue(priorityQueue.isEmpty()); - - } - - private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException { - RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true); - rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB); - rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() { - - private static final long serialVersionUID = 1L; - - @Override - public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { - return currentOptions; - } - - @Override - public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) { - return currentOptions.optimizeForPointLookup(64); - } - }); - return rocksDBStateBackend; - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index cee56aa..ed44a73 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; @@ -36,7 +35,6 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.RocksDB; import java.io.File; -import java.io.IOException; import java.util.Collections; /** @@ -100,23 +98,4 @@ public final class RocksDBTestUtils { defaultCFHandle, new CloseableRegistry()); } - - public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend( - RocksDBStateBackend rocksDbBackend, - Environment env, - TypeSerializer<K> keySerializer) throws IOException { - - return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - keySerializer, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry(), - TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), - Collections.emptyList(), - new CloseableRegistry()); - } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java index 2c73fc7..4447f08 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import java.io.DataOutputStream; @@ -75,8 +74,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest { public void testMergeIterator(int maxParallelism) throws Exception { Random random = new Random(1234); - try (ReadOptions readOptions = new ReadOptions(); - RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { + try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>(); @@ -110,7 +108,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest { int id = 0; for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) { - rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id)); + rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id)); ++id; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index ce06d1f..96003d6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state.benchmark; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils; import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; import org.apache.flink.core.memory.MemoryUtils; import org.apache.flink.testutils.junit.RetryOnFailure; @@ -166,7 +167,7 @@ public class RocksDBPerformanceTest extends TestLogger { int pos = 0; - try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) { + try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) { // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java index e02901b..a6194ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java @@ -50,7 +50,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, */ private transient int timerHeapIndex; - public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { + TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { this.timestamp = timestamp; this.key = key; this.namespace = namespace;