tillrohrmann commented on a change in pull request #13864:
URL: https://github.com/apache/flink/pull/13864#discussion_r517946326



##########
File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
##########
@@ -211,9 +212,9 @@ public void putWorker(MesosWorkerStore.Worker worker) 
throws Exception {
                synchronized (startStopLock) {
                        verifyIsRunning();
 
-                       int currentVersion = workersInZooKeeper.exists(path);
-                       if (currentVersion == -1) {
-                               workersInZooKeeper.addAndLock(path, worker);
+                       final IntegerResourceVersion currentVersion = 
workersInZooKeeper.exists(path);
+                       if (currentVersion.isNotExist()) {

Review comment:
       `doesNotExist`, `exists` or `isExisting` might be a bit better

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/IntegerResourceVersion.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link ResourceVersion} implementation with {@link Integer} value. The 
resource version in
+ * ZooKeeper is {@link Integer}.
+ */
+public class IntegerResourceVersion implements ResourceVersion, 
Comparable<IntegerResourceVersion> {
+
+       private static final IntegerResourceVersion NOT_EXIST = new 
IntegerResourceVersion(-1);

Review comment:
       maybe `NOT_EXISTING`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/ResourceVersion.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import java.io.Serializable;
+
+/**
+ * Resource version for specific state handle on the underlying storage. The 
implementation also needs to implement the
+ * {@link Comparable} interface so that we could compare the resource versions.

Review comment:
       Should the `ResourceVersion` also extends `Comparable` to ensure this 
contract?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/ResourceVersion.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import java.io.Serializable;
+
+/**
+ * Resource version for specific state handle on the underlying storage. The 
implementation also needs to implement the
+ * {@link Comparable} interface so that we could compare the resource versions.
+ */
+public interface ResourceVersion extends Serializable {
+
+       boolean isNotExist();

Review comment:
       JavaDocs are missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -176,9 +181,13 @@ public ZooKeeperStateHandleStore(
         * @param state           The new state to replace the old one
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
-       public void replace(String pathInZooKeeper, int expectedVersion, T 
state) throws Exception {
+       @Override
+       public void replace(String pathInZooKeeper, ResourceVersion 
expectedVersion, T state) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
+               checkArgument(
+                       expectedVersion instanceof IntegerResourceVersion,
+                       "Resource version type should be 
IntegerResourceVersion.");

Review comment:
       The down side would be that the `DefaultStateHandleStore` would get a 
generic type parameter.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -176,9 +181,13 @@ public ZooKeeperStateHandleStore(
         * @param state           The new state to replace the old one
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
-       public void replace(String pathInZooKeeper, int expectedVersion, T 
state) throws Exception {
+       @Override
+       public void replace(String pathInZooKeeper, ResourceVersion 
expectedVersion, T state) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
+               checkArgument(
+                       expectedVersion instanceof IntegerResourceVersion,
+                       "Resource version type should be 
IntegerResourceVersion.");

Review comment:
       We could make the interface type safe by introducing a new generic type 
`R extends ResourceVersion` to the `StateHandleStore<T, R>`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/IntegerResourceVersion.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link ResourceVersion} implementation with {@link Integer} value. The 
resource version in
+ * ZooKeeper is {@link Integer}.
+ */
+public class IntegerResourceVersion implements ResourceVersion, 
Comparable<IntegerResourceVersion> {
+
+       private static final IntegerResourceVersion NOT_EXIST = new 
IntegerResourceVersion(-1);
+
+       private final int value;
+
+       private IntegerResourceVersion(int value) {
+               this.value = value;
+       }
+
+       @Override
+       public int compareTo(@Nonnull IntegerResourceVersion other) {
+               return Integer.compare(value, other.getValue());
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == 
IntegerResourceVersion.class) {
+                       final IntegerResourceVersion that = 
(IntegerResourceVersion) obj;
+                       return this.value == that.getValue();
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Integer.hashCode(value);
+       }
+
+       @Override
+       public boolean isNotExist() {
+               return this == NOT_EXIST;
+       }
+
+       @Override
+       public String toString() {
+               return "IntegerResourceVersion{" + "value='" + value + '\'' + 
'}';
+       }
+
+       public int getValue() {
+               return this.value;
+       }
+
+       public static IntegerResourceVersion notExist() {

Review comment:
       maybe `notExisting`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Class which stores state via the provided {@link 
RetrievableStateStorageHelper} and writes the
+ * returned state handle to distributed coordination system(e.g. Zookeeper, 
Kubernetes, etc.).
+ *
+ * <p>To avoid concurrent modification issues, the implementation needs to 
ensure that only the leader could
+ * update the state store.
+ *
+ * @param <T> Type of state
+ */
+public interface StateHandleStore<T extends Serializable> {
+
+       /**
+        * Persist the state to distributed storage(e.g. S3, HDFS, etc.). And 
then creates a state handle, stores it in
+        * the distributed coordination system(e.g. ZooKeeper, Kubernetes, 
etc.).
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        * @param state State to be added
+        *
+        * @throws AlreadyExistException if the name already exists
+        * @throws Exception if persisting state or writing state handle failed
+        */
+       RetrievableStateHandle<T> add(String name, T state) throws Exception;
+
+       /**
+        * Replaces a state handle in the distributed coordination system and 
discards the old state handle.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        * @param resourceVersion resource version of previous storage object. 
If the resource version does not match, the
+        * replace operation will fail. Since there is an unexpected update 
operation snuck in.
+        * @param state State to be replace with
+        *
+        * @throws NotExistException if the name does not exist
+        * @throws Exception if persisting state or writing state handle failed
+        */
+       void replace(String name, ResourceVersion resourceVersion, T state) 
throws Exception;
+
+       /**
+        * Returns resource version of state handle with specific name on the 
underlying storage.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return current resource version on the underlying storage.
+        *
+        * @throws Exception if the check existence operation failed
+        */
+       ResourceVersion exists(String name) throws Exception;
+
+       /**
+        * Gets the {@link RetrievableStateHandle} stored with the given name.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return The retrieved state handle
+        *
+        * @throws IOException if the method failed to deserialize the stored 
state handle
+        * @throws NotExistException when the name does not exist
+        * @throws Exception if get state handle failed
+        */
+       RetrievableStateHandle<T> get(String name) throws Exception;
+
+       /**
+        * Gets all available state handles from the storage.
+        *
+        * @return All retrieved state handles.
+        *
+        * @throws Exception if get state handle operation failed
+        */
+       List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws 
Exception;
+
+       /**
+        * Return a list of all valid name for state handles.
+        *
+        * @return List of valid state handle name. The name is key name in 
ConfigMap or child path name in ZooKeeper.
+        *
+        * @throws Exception if get handle operation failed
+        */
+       Collection<String> getAllHandles() throws Exception;
+
+       /**
+        * Remove the state handle and discard the state with given name.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return True if the state handle could be removed.
+        *
+        * @throws Exception if removing the handles or discarding the state 
failed
+        */
+       boolean removeHandleAndDiscardState(String name) throws Exception;
+
+       /**
+        * Remove all the states. Not only the state handles in the distributed 
coordination system
+        * will be removed, but also the real state data on the distributed 
storage will be discarded.
+        *
+        * @throws Exception if removing the names or discarding the state 
failed
+        */
+       void removeAllHandlesAndDiscardState() throws Exception;
+
+       /**
+        * Only remove all the state handle pointers on Kubernetes or ZooKeeper.
+        *
+        * @throws Exception if removing the handles failed
+        */
+       void removeAllHandles() throws Exception;
+
+       /**
+        * The key does not exist in ConfigMap or the Zookeeper node does not 
exists.
+        */
+       class NotExistException extends Exception {

Review comment:
       nit: Maybe `NotExistingException` and `extends FlinkException`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to