Hey all,
I have been doing some digging to see if there is a good way to do an
idempotent job submission. I was hoping to write a job submission agent that
does the following:
1. Checks to see if the cluster is running yet (can contact a JobManager)
2. Checks to see if the job it is watching is running.
3. Submits the job if it is not yet running.
4. Retry if there are any issues.
Specifically at the moment there doesn’t seem to be any functionality for
submitting a job if it doesn’t exist. The current interface creates a situation
where a race condition is possible (as far as I can tell):
For example if the following sequence of events occurs:
1. JobManager fails and a new Leader is re-elected:
* JobManager Asynchronously starts restoring jobs:
here<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L300>
2. Client Calls to list currently running jobs (before jobs are restored)
and gets back an incomplete list of running
jobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1009>
because SubmitJob registers jobs in
currentJobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1300>
3. Client Assumes Job is no longer running so uses HTTP/CLI/Whatever to
restore job.
4. Current interfaces don’t pass in the same JobID (a new one is generated
for each submit) so a new Job is submitted with a new JobID
5. JobManager restores previous instance of the running Job
6. Now there are 2 instances of the job running in the cluster.
While the above state is pretty unlikely to hit when one is submitting jobs
manually, it seems to me that an agent like the above might end up hitting it
if the cluster was having trouble with JobManagers failing.
I can see that FLIP-6<https://issues.apache.org/jira/browse/FLINK-4319> is
rewriting the whole JobManager itself. From my reading of the current code base
this work is 1/2 way done in master.
From my reading of the code/docs it seems that from the submission side the
expectation for Docker/Kubernetes is that you will create two sets of
containers:
1. A JobMaster/ResourceManager container that contains the user’s job in
some form (jar or as a serialized JobGraph).
2. A TaskManager container which is either generic or potentially has user
libs (up to the implementer/cluster maintainer)
As I currently understand the code the JobMaster instances will:
1. Start up a JobMasterRunner which connects to the Leader service and
creates a JobMaster with the supplied JobGraph (which I assume will always have
the same JobID for restore purposes).
2. When the node is granted leadership the JobMasterRunner starts the
JobMaster which will schedule the ExecutionGraph which it created from the
supplied JobGraph.
This all seems fine for a new job submission but the since the restore logic is
not yet implemented I am wondering what the way that people will interact with
clusters for job submission. From this
doc<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077>
it appears that the current JobManager infrastructure will instead become a
“FlinkDispatcher". Is the intent to have the savepoint launch restore logic in
the FlinkDispatcher and have it control the Job upgrade lifecycle?
We are currently looking at running Flink on Kubernetes. FLIP-6 looks to
organize that much better than the way things currently work. Specifically for
us we are looking to implement a clean way to have clients have a clear
deployment/upgrade path for Flink jobs that can be integrated into automated
build pipelines and such. Is the intention on the new system to have another
orchestration layer for upgrading jobs or will the JobMaster itself handle
those situations?
To me the JobMaster seems like the correct place to do the job upgrade
coordination because its the single point of control for a single job. Then,
for example on kubernetes, one would just have to re-launch the JobMaster
containers and it would take care of the rest in the JobMaster logic to
consolidate the upgraded JobGraph. On the other hand this might not fit cleanly
into the separation of concerns that currently exists within the JobManager.
I am also wondering what work needs doing on FLIP-6. Overall it closely aligns
with what we are trying to do on our end to make Flink easier to use so I might
get some time to help out with this effort.
Thanks,
James bucher