[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372623#comment-16372623 ]
ASF GitHub Bot commented on FLINK-8360: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r169909152 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class represents a directory that is the target for a state snapshot. This class provides some method that + * simplify resource management when dealing with such directories, e.g. it can produce a {@link DirectoryStateHandle} + * when the snapshot is completed and disposal considers whether or not a snapshot was already completed. For a + * completed snapshot, the ownership for cleanup is transferred to the created directory state handle. For incomplete + * snapshots, calling {@link #deleteIfIncompleteSnapshot()} will delete the underlying directory resource. + */ +public class SnapshotDirectory { + + /** + * Lifecycle stages of a snapshot directory. + */ + enum State { + ONGOING, COMPLETED, DELETED + } + + /** This path describes the underlying directory for the snapshot. */ + @Nonnull + private final Path directory; + + /** The filesystem that contains the snapshot directory. */ + @Nonnull + private final FileSystem fileSystem; + + /** This reference tracks the lifecycle state of the snapshot directory. */ + @Nonnull + private AtomicReference<State> state; + + public SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem fileSystem) { + this.directory = directory; + this.fileSystem = fileSystem; + this.state = new AtomicReference<>(State.ONGOING); + } + + public SnapshotDirectory(@Nonnull Path directory) throws IOException { + this(directory, directory.getFileSystem()); + } + + @Nonnull + public Path getDirectory() { + return directory; + } + + public boolean mkdirs() throws IOException { + return fileSystem.mkdirs(directory); + } + + @Nonnull + public FileSystem getFileSystem() { + return fileSystem; + } + + public boolean exists() throws IOException { + return fileSystem.exists(directory); + } + + /** + * List the statuses of the files/directories in the snapshot directory. + * + * @return the statuses of the files/directories in the given path. + * @throws IOException if there is a problem creating the file statuses. + */ + public FileStatus[] listStatus() throws IOException { + return fileSystem.listStatus(directory); + } + + /** + * Calling this method completes the snapshot into the snapshot directory and creates a corresponding + * {@link DirectoryStateHandle} that points to the snapshot directory. Calling this method will also change the + * lifecycle state from "ongoing" to "completed". If the state was already deleted, an {@link IOException} is + * thrown. + * + * @return a directory state handle that points to the snapshot directory. + * @throws IOException if the state of this snapshot directory object is different from "ongoing". + */ + public DirectoryStateHandle completeSnapshotAndGetHandle() throws IOException { + if (state.compareAndSet(State.ONGOING, State.COMPLETED)) { + return new DirectoryStateHandle(directory, fileSystem); + } else { + throw new IOException("Expected state " + State.ONGOING + " but found state " + state.get()); + } + } + + /** + * Calling this method will attempt delete the underlying snapshot directory recursively, if the state was + * "ongoing". In this case, the state will be set to "deleted" as a result of this call. + * + * @return <code>true</code> if delete is successful, <code>false</code> otherwise. + * @throws IOException if an exception happens during the delete. + */ + public boolean deleteIfIncompleteSnapshot() throws IOException { + return state.compareAndSet(State.ONGOING, State.DELETED) && fileSystem.delete(directory, true); + } + + public boolean isSnapshotOngoing() { + return State.ONGOING.equals(state.get()); --- End diff -- I think: - This also never throws `NPE` if you call `equals` on the constant (that is why i emphasized this). - True. -Can be optimized to the same thing. Pro points I see for the other variant: - does not make you think if `==` is correct in the comparison for even a second. - still works correctly after refactoring, e.g. when the type is changed from `enum` to `String` constants in case somebody missed to adjust the statement. Anyways, I will not fight over such a minor thing and can just change it ;) > Implement task-local state recovery > ----------------------------------- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)