Myasuka commented on a change in pull request #13179:
URL: https://github.com/apache/flink/pull/13179#discussion_r483451813
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
##########
@@ -75,6 +76,13 @@
*/
<N> Stream<K> getKeys(String state, N namespace);
+ /**
+ * @return A stream of all keys for the given state and namespace.
Modifications to the state during iterating
+ * over it keys are not supported.
+ * @param state State variable for which existing keys will be returned.
Review comment:
I think we'd better give explicit description that we cannot ensure the
order of returned tuples, which means the tuples might be visited one by one
across different namespaces. What do you think?
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link
Iterator} to iterate over the keys
+ * and namespaces. This class is not thread safe.
+ *
+ * @param <K> the type of the iterated keys in RocksDB.
+ * @param <N> the type of the iterated namespaces in RocksDB.
+ */
+public class RocksStateKeysAndNamespaceIterator<K, N> implements
Iterator<Tuple2<K, N>>, AutoCloseable {
Review comment:
I try to implement the abstract class and its extended class like:
~~~java
public abstract class AbstractRocksStateKeysIterator<K> implements
AutoCloseable {
@Nonnull
protected final RocksIteratorWrapper iterator;
@Nonnull
protected final String state;
@Nonnull
protected final TypeSerializer<K> keySerializer;
protected final boolean ambiguousKeyPossible;
protected final int keyGroupPrefixBytes;
protected final DataInputDeserializer byteArrayDataInputView;
public AbstractRocksStateKeysIterator(
@Nonnull RocksIteratorWrapper iterator,
@Nonnull String state,
@Nonnull TypeSerializer<K> keySerializer,
int keyGroupPrefixBytes,
boolean ambiguousKeyPossible) {
this.iterator = iterator;
this.state = state;
this.keySerializer = keySerializer;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.ambiguousKeyPossible = ambiguousKeyPossible;
this.byteArrayDataInputView = new DataInputDeserializer();
}
protected K deserializeKey(byte[] keyBytes, DataInputDeserializer
readView) throws IOException {
readView.setBuffer(keyBytes, keyGroupPrefixBytes,
keyBytes.length - keyGroupPrefixBytes);
return RocksDBKeySerializationUtils.readKey(
keySerializer,
byteArrayDataInputView,
ambiguousKeyPossible);
}
@Override
public void close() {
iterator.close();
}
}
~~~
~~~java
public class RocksStateKeysIterator<K> extends
AbstractRocksStateKeysIterator<K> implements Iterator<K> {
@Nonnull
private final byte[] namespaceBytes;
private K nextKey;
private K previousKey;
public RocksStateKeysIterator(
@Nonnull RocksIteratorWrapper iterator,
@Nonnull String state,
@Nonnull TypeSerializer<K> keySerializer,
int keyGroupPrefixBytes,
boolean ambiguousKeyPossible,
@Nonnull byte[] namespaceBytes) {
super(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible);
this.namespaceBytes = namespaceBytes;
this.nextKey = null;
this.previousKey = null;
}
@Override
public boolean hasNext() {
try {
while (nextKey == null && iterator.isValid()) {
final byte[] keyBytes = iterator.key();
final K currentKey = deserializeKey(keyBytes,
byteArrayDataInputView);
final int namespaceByteStartPos =
byteArrayDataInputView.getPosition();
if (isMatchingNameSpace(keyBytes,
namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
previousKey = currentKey;
nextKey = currentKey;
}
iterator.next();
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to access state
[" + state + "]", e);
}
return nextKey != null;
}
@Override
public K next() {
if (!hasNext()) {
throw new NoSuchElementException("Failed to access
state [" + state + "]");
}
K tmpKey = nextKey;
nextKey = null;
return tmpKey;
}
private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
final int namespaceBytesLength = namespaceBytes.length;
final int basicLength = namespaceBytesLength + beginPos;
if (key.length >= basicLength) {
for (int i = 0; i < namespaceBytesLength; ++i) {
if (key[beginPos + i] != namespaceBytes[i]) {
return false;
}
}
return true;
}
return false;
}
}
~~~
From my point of view, `RocksStateKeysAndNamespaceIterator` and
`RocksStateKeysIterator` could share part of code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]