Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4722#discussion_r141258536
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -1946,4 +1974,44 @@ public File getInstanceBasePath() {
public boolean supportsAsynchronousSnapshots() {
return true;
}
+
+ private static class RocksIteratorWrapper<K> implements Iterator<K> {
+ private final RocksIterator iterator;
+ private final String field;
+ private final TypeSerializer<K> keySerializer;
+ private final int keyGroupPrefixBytes;
+
+ public RocksIteratorWrapper(
+ RocksIterator iterator,
+ String field,
+ TypeSerializer<K> keySerializer,
+ int keyGroupPrefixBytes) {
+ this.iterator = Preconditions.checkNotNull(iterator);
+ this.field = Preconditions.checkNotNull(field);
+ this.keySerializer =
Preconditions.checkNotNull(keySerializer);
+ this.keyGroupPrefixBytes =
Preconditions.checkNotNull(keyGroupPrefixBytes);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.isValid();
+ }
+
+ @Override
+ public K next() {
+ if (!iterator.isValid()) {
--- End diff --
fixed
---