[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348522#comment-16348522
 ] 

ASF GitHub Bot commented on FLINK-5820:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5396#discussion_r165344021
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
    @@ -19,58 +19,53 @@
     package org.apache.flink.runtime.checkpoint;
     
     import org.apache.flink.api.common.JobID;
    -import org.apache.flink.core.fs.Path;
     import org.apache.flink.core.testutils.CommonTestUtils;
     import org.apache.flink.runtime.jobgraph.JobStatus;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
     import org.apache.flink.runtime.jobgraph.OperatorID;
     import org.apache.flink.runtime.state.SharedStateRegistry;
    -import org.apache.flink.runtime.state.StreamStateHandle;
    -import org.apache.flink.runtime.state.filesystem.FileStateHandle;
    +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
    +import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
     
     import org.junit.Rule;
     import org.junit.Test;
     import org.junit.rules.TemporaryFolder;
     
    -import java.io.File;
     import java.util.Collections;
     import java.util.HashMap;
     import java.util.Map;
     
     import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
     import static org.mockito.Mockito.mock;
     import static org.mockito.Mockito.times;
     import static org.mockito.Mockito.verify;
     
    +/**
    + * Unit tests for the {@link CompletedCheckpoint}.
    + */
     public class CompletedCheckpointTest {
     
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
     
    -   /**
    -    * Tests that persistent checkpoints discard their header file.
    -    */
        @Test
    -   public void testDiscard() throws Exception {
    -           File file = tmpFolder.newFile();
    -           assertEquals(true, file.exists());
    -
    +   public void registerStatesAtRegistry() {
    --- End diff --
    
    The test whether state handles are correctly registered at the 
SharedStateRegistry was originally just sneakily added to a pre-existing 
metadata file cleanup test. That did not seem right ;-)
    
    This factors the test out into a separate method. The test method should be 
called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. 
Will change that...


> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
>                 Key: FLINK-5820
>                 URL: https://issues.apache.org/jira/browse/FLINK-5820
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
>                               /shared/    <-- shared checkpoint data
>                               /chk-1/...  <-- data exclusive to checkpoint 1
>                               /chk-2/...  <-- data exclusive to checkpoint 2
>                               /chk-3/...  <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
>                               /shared/...
>                               /chk-1/...
>                               /chk-2/...
>                               /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
>                      /savepoint-2/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to