zentol opened a new pull request #8687: [FLINK-12612][coordination] Track 
stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687
 
 
   Based on #8630, #8654 and #8680.
   
   ## What is the purpose of the change
   
   With this PR TaskExecutors keep track of which partitions are stored 
locally, and use this information to
   a) delay the termination of their jobmanager connection until all these 
partitions are released
   b) release all partitions for a given job if the jobmanager connection is 
terminated unexpectedly.
   
   To track the partitions a `JobAwareShuffleEnvironment` was introduced, which 
is an alternate version of the `ShuffleEnvironment` interface. This environment 
is aware of the concept of jobs, something that was refactored out of the 
shuffle environment over the past weeks.
   The default implementation simply wraps a `ShuffleEnvironment`, and adds 
additional book-keeping for partitions.
   
   The book-keeping is primarily kept up-to-date via listeners that are 
introduced into the `ResultPartitionWriters` by wrapping the writers returned 
by the backing environment.
   
   ### General Idea
   To explain how things work, let's imagine we have just submitted a job:
   
   Upon connecting to a JobManager the TaskExecutor uses `#markJobActive` to 
inform the environment about jobs it should keep partitions for. If this flag 
is not set for a given job, and partition that finishes will be automatically 
released. See the special-cases section for an explanation.
   
   Tasks are being submitted, and their `ResultPartitionWriters` are being 
created. The `JobAwareShuffleEnvironment` wraps the writers to introduce 
listeners that keep it in the loop about lifecycle updates for the partitions: 
whether they were setup, finished and/or failed.
   
   Tasks start running, `ResultPartitonWriters` are being setup, and the 
environment adds the created partitions to the book-keeping as in-progress 
partitions.
   
   If a task fails it will fail the `ResultPartitonWriters` (which releases the 
partition), and the environment removes the failed partitions from the 
book-keeping.
   
   if a task finishes it will finish the `ResultPartitonWriters` and the 
environment now considers these partitions are finished. From this point on the 
TaskExecutor is responsible for issuing the release call.
   
   At some point an external actor will issue call to release partitions to the 
TaskExecutor, which forwards these to the environment. Partitions are released 
and removed from book-keeping.
   
   If no more partitions and slots are held for a given job the TaskExecutor 
terminates it's connection with the JobManager.
   
   ### Abnormal events / Special cases
   
   The big special case is losing the connection to the jobmanager. In this 
case the TaskExecutor will cancel all tasks, mark the job as inactive, and 
release all finished partitions it has _at this point in time_.
   Partitions that are in-progress will NOT be cleaned up by the TaskExecutor 
to not interfere with the normal shutdown procedure of tasks. Since the task 
cancellation happens asynchronously releasing in-progress partitions would 
likely result in undefined and misleading exceptions.
   In a normal cancellation case the task will fail all of it's partitions and 
they will be cleaned up automatically.
   However, it can happen that a task finishes after the job has already been 
canceled, in which case it will actually finish the partitions. This is where 
the isActive flag for jobs comes in. This flag is unset right away when the job 
is canceled; any partition that is finished for a job that isn't active is 
automatically released by the `JobAwareShuffleEnvironment`.
   
   ### The why-is-it-implemented-like-this section
   
   #### What about concurrency?
   
   The low-level data structure for keeping track of partitions is the 
`ResultPartitionTable`, which is thread-safe. This class is accessed by Task 
threads via listeners, and via TM threads when release calls have been issues. 
This is where the majority of concurrent accesses will take place.
   In the `JobAwareShuffleEnvironment` there is one additional point of 
concurrent access which is the set of active jobs, which is queried by task 
threads when finishing partitions and updated by the TM thread when 
establishing/terminating jobmanager connections.
   No additional locks have been introduced, and as such there should not be 
any risk for deadlocks.
   
   #### Is it safe to have the Task thread release partitions if the job is 
inactive?
   
   Yes, as this is equivalent to failing a partition, which Task threads 
already do.
   
   #### Could we not handle this on the TaskExecutor level without wrappers?
   
   Potentially, but the problem here is that the TaskExecutor is quite far away 
from what is actually happening in the task, and keeping the book-keeping in 
sync introduces a few challenges.
   For example, there is no good point at which one can add partitions to the 
book-keeping. Partitions are setup within the tasks `run()` method, at _some 
point_ after it was started.
   Without introducing a callback in the Task you inevitable end up in an 
inconsistent state, and have to ensure that _whatever happens in task_ this 
situation is rectified eventually. This introduces several new edge-cases that 
we could do without. (For example, a task can be canceled right away, in which 
case it neither sets up partitions nor issues any callback regarding it's final 
state)
   
   #### Could we not handle this inside the ShuffleEnvironment?
   
   Yes, we certainly could. But this would require re-introducing the concept 
of jobs into the environment that were removed recently, and additionally 
require of every implementation to implement this logic correctly, even though 
it is be the same across all implementations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to