tillrohrmann commented on a change in pull request #13871:
URL: https://github.com/apache/flink/pull/13871#discussion_r518116292
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -209,60 +191,39 @@ public void recover() throws Exception {
}
}
- private boolean
haveAllDownloaded(List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String>> checkpointPointers) {
- if (completedCheckpoints.size() != checkpointPointers.size()) {
- return false;
- }
- Set<Long> localIds =
completedCheckpoints.stream().map(CompletedCheckpoint::getCheckpointID).collect(Collectors.toSet());
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String> initialCheckpoint : checkpointPointers) {
- if
(!localIds.contains(pathToCheckpointId(initialCheckpoint.f1))) {
- return false;
- }
- }
- return true;
- }
-
/**
- * Synchronously writes the new checkpoints to ZooKeeper and
asynchronously removes older ones.
+ * Synchronously writes the new checkpoints to state handle store and
asynchronously removes older ones.
*
* @param checkpoint Completed checkpoint to add.
*/
@Override
- public void addCheckpoint(final CompletedCheckpoint checkpoint,
CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception {
+ public void addCheckpoint(
+ final CompletedCheckpoint checkpoint,
+ CheckpointsCleaner checkpointsCleaner, Runnable
postCleanup) throws Exception {
Review comment:
nit: formatting of `postCleanup`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -209,60 +191,39 @@ public void recover() throws Exception {
}
}
- private boolean
haveAllDownloaded(List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String>> checkpointPointers) {
- if (completedCheckpoints.size() != checkpointPointers.size()) {
- return false;
- }
- Set<Long> localIds =
completedCheckpoints.stream().map(CompletedCheckpoint::getCheckpointID).collect(Collectors.toSet());
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String> initialCheckpoint : checkpointPointers) {
- if
(!localIds.contains(pathToCheckpointId(initialCheckpoint.f1))) {
- return false;
- }
- }
- return true;
- }
-
/**
- * Synchronously writes the new checkpoints to ZooKeeper and
asynchronously removes older ones.
+ * Synchronously writes the new checkpoints to state handle store and
asynchronously removes older ones.
*
* @param checkpoint Completed checkpoint to add.
*/
@Override
- public void addCheckpoint(final CompletedCheckpoint checkpoint,
CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception {
+ public void addCheckpoint(
+ final CompletedCheckpoint checkpoint,
+ CheckpointsCleaner checkpointsCleaner, Runnable
postCleanup) throws Exception {
+
checkNotNull(checkpoint, "Checkpoint");
- final String path =
checkpointIdToPath(checkpoint.getCheckpointID());
+ final String path =
completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
// Now add the new one. If it fails, we don't want to loose
existing data.
- checkpointsInZooKeeper.add(path, checkpoint);
+ checkpointStateHandleStore.add(path, checkpoint);
completedCheckpoints.addLast(checkpoint);
// Everything worked, let's remove a previous checkpoint if
necessary.
while (completedCheckpoints.size() >
maxNumberOfCheckpointsToRetain) {
final CompletedCheckpoint completedCheckpoint =
completedCheckpoints.removeFirst();
- tryRemoveCompletedCheckpoint(completedCheckpoint,
completedCheckpoint.shouldBeDiscardedOnSubsume(), checkpointsCleaner,
postCleanup);
+ tryRemoveCompletedCheckpoint(
+ completedCheckpoint,
+
completedCheckpoint.shouldBeDiscardedOnSubsume(),
+ checkpointsCleaner, postCleanup);
Review comment:
nit: formatting of `postCleanup`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
##########
@@ -30,9 +34,28 @@
*/
public final class TestingRetrievableStateStorageHelper<T extends
Serializable> implements RetrievableStateStorageHelper<T> {
+ private FunctionWithException<T, T, IOException> retrieveStateFunction;
+
+ private RunnableWithException discardStateRunnable;
+
+ private Function<T, Long> getStateSizeFunction;
Review comment:
Can these fields be `final` or do we need to change them after having
created a `TestingRetrievableStateStorageHelper` instance?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
##########
@@ -41,22 +64,43 @@
private final T state;
- private TestingRetrievableStateHandle(T state) {
+ private FunctionWithException<T, T, IOException>
retrieveStateFunction;
+
+ private RunnableWithException discardStateRunnable;
+
+ private Function<T, Long> getStateSizeFunction;
Review comment:
can these be `final`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.persistence.TestingStateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.FlinkException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link DefaultCompletedCheckpointStore}.
+ */
+public class DefaultCompletedCheckpointStoreTest {
Review comment:
`extends TestLogger` is missing
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java
##########
@@ -41,22 +64,43 @@
private final T state;
- private TestingRetrievableStateHandle(T state) {
+ private FunctionWithException<T, T, IOException>
retrieveStateFunction;
+
+ private RunnableWithException discardStateRunnable;
+
+ private Function<T, Long> getStateSizeFunction;
Review comment:
If the fields can be `null`, then we should add `@Nullable`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointStoreUtil.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_ID_KEY_PREFIX;
+
+/**
+ * {@link CheckpointStoreUtil} implementation for Kubernetes.
+ *
+ */
+public class KubernetesCheckpointStoreUtil implements CheckpointStoreUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesCheckpointStoreUtil.class);
+
+ /**
+ * Convert a checkpoint id into a ConfigMap key.
+ *
+ * @param checkpointId to convert to the key
+ *
+ * @return key created from the given checkpoint id
+ */
+ @Override
+ public String checkpointIDToName(long checkpointId) {
+ return CHECKPOINT_ID_KEY_PREFIX + String.format("%019d",
checkpointId);
+ }
+
+ /**
+ * Converts a key in ConfigMap to the checkpoint id.
+ *
+ * @param key in ConfigMap
+ *
+ * @return Checkpoint id parsed from the key
+ */
+ @Override
+ public long nameToCheckpointID(String key) {
+ try {
+ return
Long.parseLong(key.replace(CHECKPOINT_ID_KEY_PREFIX, ""));
Review comment:
`key.substring(CHECKPOINT_ID_KEY_PREFIX.length())` might be a bit faster.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointStoreUtil.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link CheckpointStoreUtil} implementation for ZooKeeper.
+ *
+ */
+public class ZooKeeperCheckpointStoreUtil implements CheckpointStoreUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperCheckpointStoreUtil.class);
+
+ /**
+ * Convert a checkpoint id into a ZooKeeper path.
+ *
+ * @param checkpointId to convert to the path
+ * @return Path created from the given checkpoint id
+ */
+ @Override
+ public String checkpointIDToName(long checkpointId) {
+ return String.format("/%019d", checkpointId);
+ }
+
+ /**
+ * Converts a path to the checkpoint id.
+ *
+ * @param path in ZooKeeper
+ * @return Checkpoint id parsed from the path
+ */
+ @Override
+ public long nameToCheckpointID(String path) {
+ try {
+ String numberString;
+
+ // check if we have a leading slash
+ if ('/' == path.charAt(0)) {
+ numberString = path.substring(1);
+ } else {
+ numberString = path;
+ }
+ return Long.parseLong(numberString);
+ } catch (NumberFormatException e) {
+ LOG.warn("Could not parse checkpoint id from {}. This
indicates that the " +
+ "checkpoint id to path conversion has
changed.", path);
+
+ return -1L;
Review comment:
Maybe say in the JavaDocs of `CheckpointStoreUtil.nameToCheckpointID`
that `-1L` represents an invalid checkpoint id or so.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_COUNTER_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} implementation for Kubernetes. The counter will
be stored in
+ * JobManager-{@link org.apache.flink.api.common.JobID}-leader ConfigMap. The
key is
+ * {@link org.apache.flink.kubernetes.utils.Constants#CHECKPOINT_COUNTER_KEY},
+ * and value is counter value.
+ */
+public class KubernetesCheckpointIDCounter implements CheckpointIDCounter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesCheckpointIDCounter.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final String lockIdentity;
+
+ private volatile boolean running;
Review comment:
I think this class is not thread safe in the sense that a concurrent
shutdown with a `getAndIncrement` call might not leave the config map with a
cleaned up `CHECKPOINT_COUNTER_KEY` (e.g. if shutdown completes first and then
`getAndIncrement` completes). Hence, I am wondering why `running` needs to be
`volatile`?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -353,6 +360,40 @@ private static String getLogging(String logFile, String
confDir, boolean hasLogb
return logging.toString();
}
+ /**
+ * Create a {@link DefaultCompletedCheckpointStore} with {@link
KubernetesStateHandleStore}.
+ *
+ * @param configuration configuration to build a
RetrievableStateStorageHelper
+ * @param kubeClient flink kubernetes client
+ * @param configMapName ConfigMap name
+ * @param executor executor to run blocking calls
+ * @param lockIdentity lock identity to check the leadership
+ * @param maxNumberOfCheckpointsToRetain max number of checkpoints to
retain on state store handle
+ *
+ * @return a {@link DefaultCompletedCheckpointStore} with {@link
KubernetesStateHandleStore}.
+ *
+ * @throws Exception when create the storage helper failed
+ */
+ public static CompletedCheckpointStore createCompletedCheckpointStore(
+ Configuration configuration,
+ FlinkKubeClient kubeClient,
+ Executor executor,
+ String configMapName,
+ String lockIdentity,
+ int maxNumberOfCheckpointsToRetain) throws Exception {
+
+ final RetrievableStateStorageHelper<CompletedCheckpoint>
stateStorage =
+ new
FileSystemStateStorageHelper<>(HighAvailabilityServicesUtils
+
.getClusterHighAvailableStoragePath(configuration),
COMPLETED_CHECKPOINT_FILE_SUFFIX);
+ final KubernetesStateHandleStore<CompletedCheckpoint>
stateHandleStore = new KubernetesStateHandleStore<>(
+ kubeClient, configMapName, stateStorage, k ->
k.startsWith(CHECKPOINT_ID_KEY_PREFIX), lockIdentity);
+ return new DefaultCompletedCheckpointStore(
+ maxNumberOfCheckpointsToRetain,
+ stateHandleStore,
+ new KubernetesCheckpointStoreUtil(),
Review comment:
Could this be a singleton using `enum`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]