Myasuka commented on a change in pull request #13179:
URL: https://github.com/apache/flink/pull/13179#discussion_r483451813



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
##########
@@ -75,6 +76,13 @@
         */
        <N> Stream<K> getKeys(String state, N namespace);
 
+       /**
+        * @return A stream of all keys for the given state and namespace. 
Modifications to the state during iterating
+        *                 over it keys are not supported.
+        * @param state State variable for which existing keys will be returned.

Review comment:
       I think we'd better give explicit description that we cannot ensure the 
order of returned tuples, which means the tuples might be visited one by one 
across different namespaces. What do you think?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys
+ * and namespaces. This class is not thread safe.
+ *
+ * @param <K> the type of the iterated keys in RocksDB.
+ * @param <N> the type of the iterated namespaces in RocksDB.
+ */
+public class RocksStateKeysAndNamespaceIterator<K, N> implements 
Iterator<Tuple2<K, N>>, AutoCloseable {

Review comment:
       I try to implement the abstract class and its extended class like:
   
   ~~~java
   public abstract class AbstractRocksStateKeysIterator<K> implements 
AutoCloseable {
        @Nonnull
        protected final RocksIteratorWrapper iterator;
   
        @Nonnull
        protected final String state;
   
        @Nonnull
        protected final TypeSerializer<K> keySerializer;
   
        protected final boolean ambiguousKeyPossible;
        protected final int keyGroupPrefixBytes;
        protected final DataInputDeserializer byteArrayDataInputView;
   
        public AbstractRocksStateKeysIterator(
                @Nonnull RocksIteratorWrapper iterator,
                @Nonnull String state,
                @Nonnull TypeSerializer<K> keySerializer,
                int keyGroupPrefixBytes,
                boolean ambiguousKeyPossible) {
                this.iterator = iterator;
                this.state = state;
                this.keySerializer = keySerializer;
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.ambiguousKeyPossible = ambiguousKeyPossible;
                this.byteArrayDataInputView = new DataInputDeserializer();
        }
   
        protected K deserializeKey(byte[] keyBytes, DataInputDeserializer 
readView) throws IOException {
                readView.setBuffer(keyBytes, keyGroupPrefixBytes, 
keyBytes.length - keyGroupPrefixBytes);
                return RocksDBKeySerializationUtils.readKey(
                        keySerializer,
                        byteArrayDataInputView,
                        ambiguousKeyPossible);
        }
   
        @Override
        public void close() {
                iterator.close();
        }
   }
   ~~~
   
   ~~~java
   public class RocksStateKeysIterator<K> extends 
AbstractRocksStateKeysIterator<K> implements Iterator<K> {
   
        @Nonnull
        private final byte[] namespaceBytes;
   
        private K nextKey;
        private K previousKey;
   
        public RocksStateKeysIterator(
                @Nonnull RocksIteratorWrapper iterator,
                @Nonnull String state,
                @Nonnull TypeSerializer<K> keySerializer,
                int keyGroupPrefixBytes,
                boolean ambiguousKeyPossible,
                @Nonnull byte[] namespaceBytes) {
                super(iterator, state, keySerializer, keyGroupPrefixBytes, 
ambiguousKeyPossible);
                this.namespaceBytes = namespaceBytes;
                this.nextKey = null;
                this.previousKey = null;
        }
   
        @Override
        public boolean hasNext() {
                try {
                        while (nextKey == null && iterator.isValid()) {
   
                                final byte[] keyBytes = iterator.key();
                                final K currentKey = deserializeKey(keyBytes, 
byteArrayDataInputView);
                                final int namespaceByteStartPos = 
byteArrayDataInputView.getPosition();
   
                                if (isMatchingNameSpace(keyBytes, 
namespaceByteStartPos) && !Objects.equals(previousKey, currentKey)) {
                                        previousKey = currentKey;
                                        nextKey = currentKey;
                                }
                                iterator.next();
                        }
                } catch (Exception e) {
                        throw new FlinkRuntimeException("Failed to access state 
[" + state + "]", e);
                }
                return nextKey != null;
        }
   
        @Override
        public K next() {
                if (!hasNext()) {
                        throw new NoSuchElementException("Failed to access 
state [" + state + "]");
                }
   
                K tmpKey = nextKey;
                nextKey = null;
                return tmpKey;
        }
   
        private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
                final int namespaceBytesLength = namespaceBytes.length;
                final int basicLength = namespaceBytesLength + beginPos;
                if (key.length >= basicLength) {
                        for (int i = 0; i < namespaceBytesLength; ++i) {
                                if (key[beginPos + i] != namespaceBytes[i]) {
                                        return false;
                                }
                        }
                        return true;
                }
                return false;
        }
   }
   ~~~
   
   From my point of view, `RocksStateKeysAndNamespaceIterator` and 
`RocksStateKeysIterator` could share part of code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to