Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5518#discussion_r169946359
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+ this.namespaceOutputStream = new
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to
{}.", this.operatorIdentifier, this.backendUID);
}
@Override
- public <N> Stream<K> getKeys(String state, N namespace) {
+ public <N> Stream<K> getKeys(String state, N namespace,
TypeSerializer<N> namespaceSerializer) {
Tuple2<ColumnFamilyHandle, ?> columnInfo =
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
- RocksIterator iterator = db.newIterator(columnInfo.f0);
- iterator.seekToFirst();
+ RocksIterator iterator = null;
+ try {
+ iterator = db.newIterator(columnInfo.f0);
+ iterator.seekToFirst();
+
+ boolean ambiguousKeyPossible =
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer,
namespaceSerializer);
+ final byte[] nameSpaceBytes;
- Iterable<K> iterable = () -> new
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
- Stream<K> targetStream =
StreamSupport.stream(iterable.spliterator(), false);
- return targetStream.onClose(iterator::close);
+ try {
+ namespaceOutputStream.reset();
+
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+ namespace,
+ namespaceSerializer,
+ namespaceOutputStream,
+ new
DataOutputViewStreamWrapper(namespaceOutputStream),
+ ambiguousKeyPossible);
+ nameSpaceBytes =
namespaceOutputStream.toByteArray();
+ } catch (IOException ex) {
+ throw new FlinkRuntimeException("Failed to get
keys from RocksDB state backend.", ex);
+ }
+
+ final RocksIteratorWrapper<K> iteratorWrapper = new
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+ ambiguousKeyPossible, nameSpaceBytes);
+
+ Stream<K> targetStream =
StreamSupport.stream(((Iterable<K>)()->iteratorWrapper).spliterator(), false);
+ return targetStream.onClose(() -> {
+ try {
+ iteratorWrapper.close();
+ } catch (Exception ex) {
+ LOG.warn("Release RocksIteratorWrapper
failed.", ex);
+ }
+ });
+ } catch (Exception ex) {
--- End diff --
As mentioned in my previous comment, this `try-catch` block is not required
when we create the iterator further down, where no more exceptions can happen,
i.e.
```
(...)
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
final RocksIteratorWrapper<K> iteratorWrapper = new
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(((Iterable<K>) ()
-> iteratorWrapper).spliterator(), false);
return targetStream.onClose(iterator::close);
```
---