[ 
https://issues.apache.org/jira/browse/FLINK-19206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17234077#comment-17234077
 ] 

Mike Kaplinskiy commented on FLINK-19206:
-----------------------------------------

Hey Yang, thanks for the quick response. Apologies for the slow response on my 
part.

 

You are correct that I'm not generating a fixed cluster id, though I could if 
necessary (maybe using the pod name?). My use case is batch jobs - so 
savepoints don't help. I am using an application cluster now - mostly because 
persistent clusters are wasteful when you use kube-autoscaler. The problem I'm 
facing is the "launcher" itself can be terminated (i.e. the command running 
"./bin/flink run-application ...").

Unfortunately "not retrying externally" is really not an option in a kube 
cluster - nodes, pods & jobs restarting is normal - even outside of the Flink 
job failing. In a scenario where a cluster is started externally to kube (via, 
well, a user) - you're completely right that there should not be an owner 
reference on any cluster resource. That cluster needs to stay running until 
someone says to delete it. But if there's automation to start a cluster - 
automation that is itself managed by kube - I don't see how else to do it 
without an owner reference.

Ultimately, here's a very concrete use case: kubernetes cron jobs [1]. If I set 
up a cronjob to run a container that starts a Flink job, one of the options I 
could set is: {{concurrencyPolicy: Replace}}. This means that if a previous 
instance of the cron job is still running by the time it's time to run again, 
the old job is killed (via a command similar to {{kubectl delete job 
cronjob-<previous_run>}}). This deletes the Job and the Pod (because it has an 
owner reference) that launched flink, but not any flink cluster that the pod 
might have launched. There's other scenarios that can cause the pod to be shot 
- including {{activeDeadlineSeconds}} or people "re-running" jobs by using 
{{kubectl create job --from=cronjob/foo foo-rerun}} which are not affected by 
{{concurrencyPolicy}} at all (and hence DON'T get killed when running over 
their time limit).

 

In net, for batch jobs, there are a lot of different ways the jobs are created 
- and most of them do start via some kind of automation. Having the ability to 
cascade deletions to any created Flink resources would be a valuable addition 
to make Flink "act native" in the eyes of kubernetes users - i.e. if you create 
a pod to start a cluster, deleting said pod deletes the cluster. For other use 
cases - for example if you want to create a custom resource in kube to manage 
Flink jobs [2] - it would be useful cascade deletions as well.

 

[1] [https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/]

[2] [https://github.com/GoogleCloudPlatform/flink-on-k8s-operator]

> Add an ability to set ownerReference manually in Kubernetes
> -----------------------------------------------------------
>
>                 Key: FLINK-19206
>                 URL: https://issues.apache.org/jira/browse/FLINK-19206
>             Project: Flink
>          Issue Type: Improvement
>          Components: Deployment / Kubernetes
>            Reporter: Mike Kaplinskiy
>            Priority: Minor
>
> The current Kubernetes deployment creates a service that is the 
> ownerReference of all the sub-objects (the JM & TM deployments & the rest 
> service). However, something presumably has to start the cluster in the first 
> place. If you are using a job cluster, that can be something like a 
> kubernetes Job, a CronJob or a tool like Airflow. Unfortunately any failures 
> in the Flink job can cause retries from these higher-level primitives, which 
> can yield a lot of "stale clusters" that aren't GCed.
> The proposal here is to add a configuration option to set the ownerReference 
> of the Flink Service. This way the service (and by proxy, all the cluster 
> components) get deleted when the "parent" decides - including if the parent 
> is itself a Kubernetes pod. For reference, Spark does something similar via 
> {{spark.kubernetes.driver.pod.name}} (documented at 
> [https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode-executor-pod-garbage-collection]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to