fenglu-g commented on a change in pull request #3532: [AIRFLOW-2658] Add GCP
specific k8s pod operator
URL: https://github.com/apache/incubator-airflow/pull/3532#discussion_r206629560
##########
File path: airflow/contrib/operators/gcp_container_operator.py
##########
@@ -170,3 +175,147 @@ def execute(self, context):
hook = GKEClusterHook(self.project_id, self.location)
create_op = hook.create_cluster(cluster=self.body)
return create_op
+
+
+KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
+G_APP_CRED = "GOOGLE_APPLICATION_CREDENTIALS"
+
+
+class GKEPodOperator(KubernetesPodOperator):
+ template_fields = ('project_id', 'location',
+ 'cluster_name') + KubernetesPodOperator.template_fields
+
+ @apply_defaults
+ def __init__(self,
+ project_id,
+ location,
+ cluster_name,
+ gcp_conn_id='google_cloud_default',
+ *args,
+ **kwargs):
+ """
+ Executes a task in a Kubernetes pod in the specified Google Kubernetes
+ Engine cluster
+
+ This Operator assumes that the system has gcloud installed and either
+ has working default application credentials or has configured a
+ connection id with a service account.
+
+ The **minimum** required to define a cluster to create are the
variables
+ ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``,
+ ``namespace``, and ``image``
+
+ **Operator Creation**: ::
+
+ operator = GKEPodOperator(task_id='pod_op',
+ project_id='my-project',
+ location='us-central1-a',
+ cluster_name='my-cluster-name',
+ name='task-name',
+ namespace='default',
+ image='perl')
+
+ .. seealso::
+ For more detail about application authentication have a look at
the reference:
+
https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
+
+ :param project_id: The Google Developers Console project id
+ :type project_id: str
+ :param location: The name of the Google Kubernetes Engine zone in
which the
+ cluster resides, e.g. 'us-central1-a'
+ :type location: str
+ :param cluster_name: The name of the Google Kubernetes Engine cluster
the pod
+ should be spawned in
+ :type cluster_name: str
+ :param gcp_conn_id: The google cloud connection id to use. This allows
for
+ users to specify a service account.
+ :type gcp_conn_id: str
+ """
+ super(GKEPodOperator, self).__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.location = location
+ self.cluster_name = cluster_name
+ self.gcp_conn_id = gcp_conn_id
+
+ def execute(self, context):
+ # Specifying a service account file allows the user to using non
default
+ # authentication for creating a Kubernetes Pod. This is done by
setting the
+ # environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud
looks at.
+ key_file = None
+
+ # If gcp_conn_id is not specified gcloud will use the default
+ # service account credentials.
+ if self.gcp_conn_id:
+ from airflow.hooks.base_hook import BaseHook
+ # extras is a deserialized json object
+ extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson
+ # key_file only gets set if a json file is created from a JSON
string in
+ # the web ui, else none
+ key_file = self._set_env_from_extras(extras=extras)
+
+ # Write config to a temp file and set the environment variable to
point to it.
+ # This is to avoid race conditions of reading/writing a single file
+ with tempfile.NamedTemporaryFile() as conf_file:
+ os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name
+ # Attempt to get/update credentials
+ # We call gcloud directly instead of using google-cloud-python api
+ # because there is no way to write kubernetes config to a file,
which is
+ # required by KubernetesPodOperator.
+ # The gcloud command looks at the env variable `KUBECONFIG` for
where to save
+ # the kubernetes config file.
+ subprocess.check_call(
+ ["gcloud", "container", "clusters", "get-credentials",
+ self.cluster_name,
+ "--zone", self.location,
+ "--project", self.project_id])
+
+ # Since the key file is of type mkstemp() closing the file will
delete it from
+ # the file system so it cannot be accessed after we don't need it
anymore
+ if key_file is not None:
Review comment:
nit: if key_file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services