zentol opened a new pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor URL: https://github.com/apache/flink/pull/8687 Based on #8630, #8654 and #8680. ## What is the purpose of the change With this PR TaskExecutors keep track of which partitions are stored locally, and use this information to a) delay the termination of their jobmanager connection until all these partitions are released b) release all partitions for a given job if the jobmanager connection is terminated unexpectedly. To track the partitions a `JobAwareShuffleEnvironment` was introduced, which is an alternate version of the `ShuffleEnvironment` interface. This environment is aware of the concept of jobs, something that was refactored out of the shuffle environment over the past weeks. The default implementation simply wraps a `ShuffleEnvironment`, and adds additional book-keeping for partitions. The book-keeping is primarily kept up-to-date via listeners that are introduced into the `ResultPartitionWriters` by wrapping the writers returned by the backing environment. ### General Idea To explain how things work, let's imagine we have just submitted a job: Upon connecting to a JobManager the TaskExecutor uses `#markJobActive` to inform the environment about jobs it should keep partitions for. If this flag is not set for a given job, and partition that finishes will be automatically released. See the special-cases section for an explanation. Tasks are being submitted, and their `ResultPartitionWriters` are being created. The `JobAwareShuffleEnvironment` wraps the writers to introduce listeners that keep it in the loop about lifecycle updates for the partitions: whether they were setup, finished and/or failed. Tasks start running, `ResultPartitonWriters` are being setup, and the environment adds the created partitions to the book-keeping as in-progress partitions. If a task fails it will fail the `ResultPartitonWriters` (which releases the partition), and the environment removes the failed partitions from the book-keeping. if a task finishes it will finish the `ResultPartitonWriters` and the environment now considers these partitions are finished. From this point on the TaskExecutor is responsible for issuing the release call. At some point an external actor will issue call to release partitions to the TaskExecutor, which forwards these to the environment. Partitions are released and removed from book-keeping. If no more partitions and slots are held for a given job the TaskExecutor terminates it's connection with the JobManager. ### Abnormal events / Special cases The big special case is losing the connection to the jobmanager. In this case the TaskExecutor will cancel all tasks, mark the job as inactive, and release all finished partitions it has _at this point in time_. Partitions that are in-progress will NOT be cleaned up by the TaskExecutor to not interfere with the normal shutdown procedure of tasks. Since the task cancellation happens asynchronously releasing in-progress partitions would likely result in undefined and misleading exceptions. In a normal cancellation case the task will fail all of it's partitions and they will be cleaned up automatically. However, it can happen that a task finishes after the job has already been canceled, in which case it will actually finish the partitions. This is where the isActive flag for jobs comes in. This flag is unset right away when the job is canceled; any partition that is finished for a job that isn't active is automatically released by the `JobAwareShuffleEnvironment`. ### The why-is-it-implemented-like-this section #### What about concurrency? The low-level data structure for keeping track of partitions is the `ResultPartitionTable`, which is thread-safe. This class is accessed by Task threads via listeners, and via TM threads when release calls have been issues. This is where the majority of concurrent accesses will take place. In the `JobAwareShuffleEnvironment` there is one additional point of concurrent access which is the set of active jobs, which is queried by task threads when finishing partitions and updated by the TM thread when establishing/terminating jobmanager connections. No additional locks have been introduced, and as such there should not be any risk for deadlocks. #### Is it safe to have the Task thread release partitions if the job is inactive? Yes, as this is equivalent to failing a partition, which Task threads already do. #### Could we not handle this on the TaskExecutor level without wrappers? Potentially, but the problem here is that the TaskExecutor is quite far away from what is actually happening in the task, and keeping the book-keeping in sync introduces a few challenges. For example, there is no good point at which one can add partitions to the book-keeping. Partitions are setup within the tasks `run()` method, at _some point_ after it was started. Without introducing a callback in the Task you inevitable end up in an inconsistent state, and have to ensure that _whatever happens in task_ this situation is rectified eventually. This introduces several new edge-cases that we could do without. (For example, a task can be canceled right away, in which case it neither sets up partitions nor issues any callback regarding it's final state) #### Could we not handle this inside the ShuffleEnvironment? Yes, we certainly could. But this would require re-introducing the concept of jobs into the environment that were removed recently, and additionally require of every implementation to implement this logic correctly, even though it is be the same across all implementations.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
