Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168489132
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
---
@@ -1195,22 +1202,15 @@ class TaskManager(
config.getTimeout().getSize(),
config.getTimeout().getUnit()))
- //TODO: make configurable, this is just the future fallback case,
- // integrate with
ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY
- // TODO: wire this so that the manager survives the end of the task
- //TODO create "localState" sub-directories!!!
- val taskExecutorLocalStateStoresManager =
- new
TaskExecutorLocalStateStoresManager(ioManager.getSpillingDirectories)
-
- val localStateStore =
taskExecutorLocalStateStoresManager.localStateStoreForTask(
+ val taskLocalStateStore =
taskManagerLocalStateStoresManager.localStateStoreForTask(
jobInformation.getJobId,
taskInformation.getJobVertexId,
tdd.getSubtaskIndex)
- val slotStateManager = new TaskStateManagerImpl(
+ val taskLocalStateManager = new TaskStateManagerImpl(
--- End diff --
maybe rename to `taskStateManager`
---