[ https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045233#comment-15045233 ]
ASF GitHub Bot commented on FLINK-2976: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1434#discussion_r46845520 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Java heap backed {@link StateStore}. + * + * @param <T> Type of state + */ +class HeapStateStore<T extends Serializable> implements StateStore<T> { + + private final Map<String, T> stateMap = new HashMap<>(); + + private final AtomicInteger idCounter = new AtomicInteger(); + + @Override + public String putState(T state) throws Exception { + checkNotNull(state, "State"); + + String key = "jobmanager://savepoints/" + idCounter.incrementAndGet(); --- End diff -- Why do we use an `AtomicInteger` here? Does this mean that `putState` will be accessed concurrently? If this is the case, then `stateMap.put` is problematic, since a `HashMap` is not thread safe. > Save and load checkpoints manually > ---------------------------------- > > Key: FLINK-2976 > URL: https://issues.apache.org/jira/browse/FLINK-2976 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Affects Versions: 0.10.0 > Reporter: Ufuk Celebi > Fix For: 1.0.0 > > > Currently, all checkpointed state is bound to a job. After the job finishes > all state is lost. In case of an HA cluster, jobs can live longer than the > cluster, but they still suffer from the same issue when they finish. > Multiple users have requested the feature to manually save a checkpoint in > order to resume from it at a later point. This is especially important for > production environments. As an example, consider upgrading your existing > production Flink program. Currently, you loose all the state of your program. > With the proposed mechanism, it will be possible to save a checkpoint, stop > and update your program, and then continue your program with the checkpoint. > The required operations can be simple: > saveCheckpoint(JobID) => checkpointID: long > loadCheckpoint(JobID, long) => void > For the initial version, I would apply the following restriction: > - The topology needs to stay the same (JobGraph parallelism, etc.) > A user can configure this behaviour via the environment like the > checkpointing interval. Furthermore, the user can trigger the save operation > via the command line at arbitrary times and load a checkpoint when submitting > a job, e.g. > bin/flink checkpoint <JobID> => checkpointID: long > and > bin/flink run --loadCheckpoint JobID [latest saved checkpoint] > bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint] > As far as I can tell, the required mechanisms are similar to the ones > implemented for JobManager high availability. We need to make sure to persist > the CompletedCheckpoint instances as a pointer to the checkpoint state and to > *not* remove saved checkpoint state. > On the client side, we need to give the job and its vertices the same IDs to > allow mapping the checkpoint state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)