Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168501090
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -1195,22 +1202,15 @@ class TaskManager(
               config.getTimeout().getSize(),
               config.getTimeout().getUnit()))
     
    -      //TODO: make configurable, this is just the future fallback case,
    -      // integrate with 
ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY
    -      // TODO: wire this so that the manager survives the end of the task
    -      //TODO create "localState" sub-directories!!!
    -      val taskExecutorLocalStateStoresManager =
    -        new 
TaskExecutorLocalStateStoresManager(ioManager.getSpillingDirectories)
    -
    -      val localStateStore = 
taskExecutorLocalStateStoresManager.localStateStoreForTask(
    +      val taskLocalStateStore = 
taskManagerLocalStateStoresManager.localStateStoreForTask(
             jobInformation.getJobId,
             taskInformation.getJobVertexId,
             tdd.getSubtaskIndex)
     
    -      val slotStateManager = new TaskStateManagerImpl(
    +      val taskLocalStateManager = new TaskStateManagerImpl(
    --- End diff --
    
    👍 


---

Reply via email to