[ 
https://issues.apache.org/jira/browse/AIRFLOW-3220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684483#comment-16684483
 ] 

ASF GitHub Bot commented on AIRFLOW-3220:
-----------------------------------------

kaxil closed pull request #4167: [AIRFLOW-3220] Implement Instance Group 
Manager Operators for GCE
URL: https://github.com/apache/incubator-airflow/pull/4167
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index d005437890..114236d467 100644
--- a/.gitignore
+++ b/.gitignore
@@ -146,3 +146,9 @@ npm-debug.log*
 static/dist
 derby.log
 metastore_db
+
+# Airflow log files when airflow is run locally
+airflow-*.err
+airflow-*.out
+airflow-*.log
+airflow-*.pid
diff --git a/airflow/contrib/example_dags/example_gcp_compute.py 
b/airflow/contrib/example_dags/example_gcp_compute.py
index e4abe2e152..51a55b6a99 100644
--- a/airflow/contrib/example_dags/example_gcp_compute.py
+++ b/airflow/contrib/example_dags/example_gcp_compute.py
@@ -24,7 +24,7 @@
 This DAG relies on the following Airflow variables
 https://airflow.apache.org/concepts.html#variables
 * PROJECT_ID - Google Cloud Platform project where the Compute Engine instance 
exists.
-* LOCATION - Google Cloud Platform zone where the instance exists.
+* ZONE - Google Cloud Platform zone where the instance exists.
 * INSTANCE - Name of the Compute Engine instance.
 * SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 
'n1-standard-1'.
     See https://cloud.google.com/compute/docs/machine-types
@@ -37,19 +37,23 @@
 from airflow.contrib.operators.gcp_compute_operator import 
GceInstanceStartOperator, \
     GceInstanceStopOperator, GceSetMachineTypeOperator
 
-# [START howto_operator_gce_args]
-PROJECT_ID = models.Variable.get('PROJECT_ID', '')
-LOCATION = models.Variable.get('LOCATION', '')
-INSTANCE = models.Variable.get('INSTANCE', '')
-SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
-SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, 
SHORT_MACHINE_TYPE_NAME)
-}
+# [START howto_operator_gce_args_common]
+PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
+ZONE = models.Variable.get('ZONE', 'europe-west1-b')
+INSTANCE = models.Variable.get('INSTANCE', 'test-instance')
 
 default_args = {
     'start_date': airflow.utils.dates.days_ago(1)
 }
-# [END howto_operator_gce_args]
+# [END howto_operator_gce_args_common]
+
+# [START howto_operator_gce_args_set_machine_type]
+SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', 
'n1-standard-1')
+SET_MACHINE_TYPE_BODY = {
+    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, 
SHORT_MACHINE_TYPE_NAME)
+}
+# [END howto_operator_gce_args_set_machine_type]
+
 
 with models.DAG(
     'example_gcp_compute',
@@ -59,7 +63,7 @@
     # [START howto_operator_gce_start]
     gce_instance_start = GceInstanceStartOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         task_id='gcp_compute_start_task'
     )
@@ -67,14 +71,14 @@
     # Duplicate start for idempotence testing
     gce_instance_start2 = GceInstanceStartOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         task_id='gcp_compute_start_task2'
     )
     # [START howto_operator_gce_stop]
     gce_instance_stop = GceInstanceStopOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         task_id='gcp_compute_stop_task'
     )
@@ -82,14 +86,14 @@
     # Duplicate stop for idempotence testing
     gce_instance_stop2 = GceInstanceStopOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         task_id='gcp_compute_stop_task2'
     )
     # [START howto_operator_gce_set_machine_type]
     gce_set_machine_type = GceSetMachineTypeOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type'
@@ -98,7 +102,7 @@
     # Duplicate set machine type for idempotence testing
     gce_set_machine_type2 = GceSetMachineTypeOperator(
         project_id=PROJECT_ID,
-        zone=LOCATION,
+        zone=ZONE,
         resource_id=INSTANCE,
         body=SET_MACHINE_TYPE_BODY,
         task_id='gcp_compute_set_machine_type2'
diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py 
b/airflow/contrib/example_dags/example_gcp_compute_igm.py
new file mode 100644
index 0000000000..dc24259f9f
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that uses IGM-type compute operations:
+* copy of Instance Template
+* update template in Instance Group Manager
+
+This DAG relies on the following OS environment variables
+
+* PROJECT_ID - the Google Cloud Platform project where the Compute Engine 
instance exists
+* ZONE - the zone where the Compute Engine instance exists
+
+Variables for copy template operator:
+* TEMPLATE_NAME - name of the template to copy
+* NEW_TEMPLATE_NAME - name of the new template
+* NEW_DESCRIPTION - description added to the template
+
+Variables for update template in Group Manager:
+
+* INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
+* SOURCE_TEMPLATE_URL - url of the template to replace in the Instance Group 
Manager
+* DESTINATION_TEMPLATE_URL - url of the new template to set in the Instance 
Group Manager
+"""
+
+import os
+import datetime
+
+import airflow
+from airflow import models
+from airflow.contrib.operators.gcp_compute_operator import \
+    GceInstanceTemplateCopyOperator, 
GceInstanceGroupManagerUpdateTemplateOperator
+
+# [START howto_operator_compute_igm_common_args]
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+ZONE = os.environ.get('ZONE', 'europe-west1-b')
+
+default_args = {
+    'start_date': airflow.utils.dates.days_ago(1)
+}
+# [END howto_operator_compute_igm_common_args]
+
+# [START howto_operator_compute_template_copy_args]
+TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
+NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
+                                   'instance-template-test-new')
+NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description')
+GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
+    "name": NEW_TEMPLATE_NAME,
+    "description": NEW_DESCRIPTION,
+    "properties": {
+        "machineType": "n1-standard-2"
+    }
+}
+# [END howto_operator_compute_template_copy_args]
+
+# [START howto_operator_compute_igm_update_template_args]
+INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
+                                             'instance-group-test')
+
+SOURCE_TEMPLATE_URL = os.environ.get(
+    'SOURCE_TEMPLATE_URL',
+    "https://www.googleapis.com/compute/beta/projects/";
+    "example-project/global/instanceTemplates/instance-template-test")
+
+DESTINATION_TEMPLATE_URL = os.environ.get(
+    'DESTINATION_TEMPLATE_URL',
+    "https://www.googleapis.com/compute/beta/projects/";
+    "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
+
+UPDATE_POLICY = {
+    "type": "OPPORTUNISTIC",
+    "minimalAction": "RESTART",
+    "maxSurge": {
+        "fixed": 1
+    },
+    "minReadySec": 1800
+}
+
+# [END howto_operator_compute_igm_update_template_args]
+
+
+with models.DAG(
+    'example_gcp_compute_igm',
+    default_args=default_args,
+    schedule_interval=datetime.timedelta(days=1)
+) as dag:
+    # [START howto_operator_gce_igm_copy_template]
+    gce_instance_template_copy = GceInstanceTemplateCopyOperator(
+        project_id=PROJECT_ID,
+        resource_id=TEMPLATE_NAME,
+        body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
+        task_id='gcp_compute_igm_copy_template_task'
+    )
+    # [END howto_operator_gce_igm_copy_template]
+    # Added to check for idempotence
+    gce_instance_template_copy2 = GceInstanceTemplateCopyOperator(
+        project_id=PROJECT_ID,
+        resource_id=TEMPLATE_NAME,
+        body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
+        task_id='gcp_compute_igm_copy_template_task_2'
+    )
+    # [START howto_operator_gce_igm_update_template]
+    gce_instance_group_manager_update_template = \
+        GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            resource_id=INSTANCE_GROUP_MANAGER_NAME,
+            zone=ZONE,
+            source_template=SOURCE_TEMPLATE_URL,
+            destination_template=DESTINATION_TEMPLATE_URL,
+            update_policy=UPDATE_POLICY,
+            task_id='gcp_compute_igm_group_manager_update_template'
+        )
+    # [END howto_operator_gce_igm_update_template]
+    # Added to check for idempotence (and without UPDATE_POLICY)
+    gce_instance_group_manager_update_template2 = \
+        GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            resource_id=INSTANCE_GROUP_MANAGER_NAME,
+            zone=ZONE,
+            source_template=SOURCE_TEMPLATE_URL,
+            destination_template=DESTINATION_TEMPLATE_URL,
+            task_id='gcp_compute_igm_group_manager_update_template_2'
+        )
+    gce_instance_template_copy >> gce_instance_template_copy2 >> \
+        gce_instance_group_manager_update_template >> \
+        gce_instance_group_manager_update_template2
diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py 
b/airflow/contrib/example_dags/example_gcp_function_delete.py
index 30f5369af6..d87eed39c5 100644
--- a/airflow/contrib/example_dags/example_gcp_function_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_delete.py
@@ -33,9 +33,9 @@
 from airflow.contrib.operators.gcp_function_operator import 
GcfFunctionDeleteOperator
 
 # [START howto_operator_gcf_delete_args]
-PROJECT_ID = models.Variable.get('PROJECT_ID', '')
-LOCATION = models.Variable.get('LOCATION', '')
-ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
+PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
+LOCATION = models.Variable.get('LOCATION', 'europe-west1')
+ENTRYPOINT = models.Variable.get('ENTRYPOINT', 'helloWorld')
 # A fully-qualified name of the function to delete
 
 FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, 
LOCATION,
diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py 
b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
index a0e44957b9..606cc181b0 100644
--- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@@ -45,11 +45,14 @@
 from airflow.utils import dates
 
 # [START howto_operator_gcf_deploy_variables]
-PROJECT_ID = models.Variable.get('PROJECT_ID', '')
-LOCATION = models.Variable.get('LOCATION', '')
+PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
+LOCATION = models.Variable.get('LOCATION', 'europe-west1')
 SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
 SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
-SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
+SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY',
+                                        'https://source.developers.google.com/'
+                                        'projects/example-airflow/'
+                                        
'repos/hello-world/moveable-aliases/master')
 ZIP_PATH = models.Variable.get('ZIP_PATH', '')
 ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
 FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, 
LOCATION,
@@ -70,11 +73,7 @@
 
 # [START howto_operator_gcf_deploy_args]
 default_args = {
-    'start_date': dates.days_ago(1),
-    'project_id': PROJECT_ID,
-    'location': LOCATION,
-    'body': body,
-    'validate_body': VALIDATE_BODY
+    'start_date': dates.days_ago(1)
 }
 # [END howto_operator_gcf_deploy_args]
 
@@ -103,11 +102,15 @@
     # [START howto_operator_gcf_deploy]
     deploy_task = GcfFunctionDeployOperator(
         task_id="gcf_deploy_task",
-        name=FUNCTION_NAME
+        name=FUNCTION_NAME,
+        project_id=PROJECT_ID,
+        location=LOCATION,
+        body=body,
+        validate_body=VALIDATE_BODY
     )
     # [END howto_operator_gcf_deploy]
     delete_task = GcfFunctionDeleteOperator(
         task_id="gcf_delete_task",
-        name=FUNCTION_NAME
+        name=FUNCTION_NAME,
     )
     deploy_task >> delete_task
diff --git a/airflow/contrib/hooks/gcp_compute_hook.py 
b/airflow/contrib/hooks/gcp_compute_hook.py
index 5fa088942b..617e39cb40 100644
--- a/airflow/contrib/hooks/gcp_compute_hook.py
+++ b/airflow/contrib/hooks/gcp_compute_hook.py
@@ -68,14 +68,14 @@ def start_instance(self, project_id, zone, resource_id):
         """
         Starts an existing instance defined by project_id, zone and 
resource_id.
 
-        :param project_id: Google Cloud Platform project where the Compute 
Engine
-        instance exists.
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance exists
         :type project_id: str
-        :param zone: Google Cloud Platform zone where the instance exists.
+        :param zone: Google Cloud Platform zone where the instance exists
         :type zone: str
-        :param resource_id: Name of the Compute Engine instance resource.
+        :param resource_id: Name of the Compute Engine instance resource
         :type resource_id: str
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().instances().start(
@@ -83,21 +83,26 @@ def start_instance(self, project_id, zone, resource_id):
             zone=zone,
             instance=resource_id
         ).execute(num_retries=NUM_RETRIES)
-        operation_name = response["name"]
-        return self._wait_for_operation_to_complete(project_id, zone, 
operation_name)
+        try:
+            operation_name = response["name"]
+        except KeyError:
+            raise AirflowException(
+                "Wrong response '{}' returned - it should contain "
+                "'name' field".format(response))
+        return self._wait_for_operation_to_complete(project_id, 
operation_name, zone)
 
     def stop_instance(self, project_id, zone, resource_id):
         """
-        Stops an instance defined by project_id, zone and resource_id.
+        Stops an instance defined by project_id, zone and resource_id
 
-        :param project_id: Google Cloud Platform project where the Compute 
Engine
-        instance exists.
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance exists
         :type project_id: str
-        :param zone: Google Cloud Platform zone where the instance exists.
+        :param zone: Google Cloud Platform zone where the instance exists
         :type zone: str
-        :param resource_id: Name of the Compute Engine instance resource.
+        :param resource_id: Name of the Compute Engine instance resource
         :type resource_id: str
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().instances().stop(
@@ -105,50 +110,178 @@ def stop_instance(self, project_id, zone, resource_id):
             zone=zone,
             instance=resource_id
         ).execute(num_retries=NUM_RETRIES)
-        operation_name = response["name"]
-        return self._wait_for_operation_to_complete(project_id, zone, 
operation_name)
+        try:
+            operation_name = response["name"]
+        except KeyError:
+            raise AirflowException(
+                "Wrong response '{}' returned - it should contain "
+                "'name' field".format(response))
+        return self._wait_for_operation_to_complete(project_id, 
operation_name, zone)
 
     def set_machine_type(self, project_id, zone, resource_id, body):
         """
         Sets machine type of an instance defined by project_id, zone and 
resource_id.
 
-        :param project_id: Google Cloud Platform project where the Compute 
Engine
-        instance exists.
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance exists
         :type project_id: str
         :param zone: Google Cloud Platform zone where the instance exists.
         :type zone: str
-        :param resource_id: Name of the Compute Engine instance resource.
+        :param resource_id: Name of the Compute Engine instance resource
         :type resource_id: str
         :param body: Body required by the Compute Engine setMachineType API,
-        as described in
-        
https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType
+            as described in
+            
https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self._execute_set_machine_type(project_id, zone, 
resource_id, body)
-        operation_name = response["name"]
-        return self._wait_for_operation_to_complete(project_id, zone, 
operation_name)
+        try:
+            operation_name = response["name"]
+        except KeyError:
+            raise AirflowException(
+                "Wrong response '{}' returned - it should contain "
+                "'name' field".format(response))
+        return self._wait_for_operation_to_complete(project_id, 
operation_name, zone)
 
     def _execute_set_machine_type(self, project_id, zone, resource_id, body):
         return self.get_conn().instances().setMachineType(
             project=project_id, zone=zone, instance=resource_id, body=body)\
             .execute(num_retries=NUM_RETRIES)
 
-    def _wait_for_operation_to_complete(self, project_id, zone, 
operation_name):
+    def get_instance_template(self, project_id, resource_id):
         """
-        Waits for the named operation to complete - checks status of the
-        asynchronous call.
+        Retrieves instance template by project_id and resource_id.
+
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance template exists
+        :type project_id: str
+        :param resource_id: Name of the instance template
+        :type resource_id: str
+        :return: Instance template representation as object according to
+            
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
+        :rtype: dict
+        """
+        response = self.get_conn().instanceTemplates().get(
+            project=project_id,
+            instanceTemplate=resource_id
+        ).execute(num_retries=NUM_RETRIES)
+        return response
+
+    def insert_instance_template(self, project_id, body, request_id=None):
+        """
+        Inserts instance template using body specified
+
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance exists
+        :type project_id: str
+        :param body: Instance template representation as object according to
+            
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
+        :type body: dict
+        :param request_id: Optional, unique request_id that you might add to 
achieve
+            full idempotence (for example when client call times out repeating 
the request
+            with the same request id will not create a new instance template 
again)
+            It should be in UUID format as defined in RFC 4122
+        :type request_id: str
+        :return: True if the operation succeeded
+        :rtype: bool
+        """
+        response = self.get_conn().instanceTemplates().insert(
+            project=project_id,
+            body=body,
+            requestId=request_id
+        ).execute(num_retries=NUM_RETRIES)
+        try:
+            operation_name = response["name"]
+        except KeyError:
+            raise AirflowException(
+                "Wrong response '{}' returned - it should contain "
+                "'name' field".format(response))
+        return self._wait_for_operation_to_complete(project_id, operation_name)
+
+    def get_instance_group_manager(self, project_id, zone, resource_id):
+        """
+        Retrieves Instance Group Manager by project_id, zone and resource_id.
+
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance Group Manager exists
+        :type project_id: str
+        :param zone: Google Cloud Platform zone where the Instance Group 
Manager exists
+        :type zone: str
+        :param resource_id: Name of the Instance Group Manager
+        :type resource_id: str
+        :return: Instance group manager representation as object according to
+            
https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers
+        :rtype: dict
+        """
+        response = self.get_conn().instanceGroupManagers().get(
+            project=project_id,
+            zone=zone,
+            instanceGroupManager=resource_id
+        ).execute(num_retries=NUM_RETRIES)
+        return response
+
+    def patch_instance_group_manager(self, project_id, zone, resource_id,
+                                     body, request_id=None):
+        """
+        Patches Instance Group Manager with the specified body.
+
+        :param project_id: Google Cloud Platform project ID where the Compute 
Engine
+            Instance Group Manager exists
+        :type project_id: str
+        :param zone: Google Cloud Platform zone where the Instance Group 
Manager exists
+        :type zone: str
+        :param resource_id: Name of the Instance Group Manager
+        :type resource_id: str
+        :param body: Instance Group Manager representation as json-merge-patch 
object
+            according to
+            
https://cloud.google.com/compute/docs/reference/rest/beta/instanceTemplates/patch
+        :type body: dict
+        :param request_id: Optional, unique request_id that you might add to 
achieve
+            full idempotence (for example when client call times out repeating 
the request
+            with the same request id will not create a new instance template 
again).
+            It should be in UUID format as defined in RFC 4122
+        :type request_id: str
+        :return: True if the operation succeeded
+        :rtype: bool
+        """
+        response = self.get_conn().instanceGroupManagers().patch(
+            project=project_id,
+            zone=zone,
+            instanceGroupManager=resource_id,
+            body=body,
+            requestId=request_id
+        ).execute(num_retries=NUM_RETRIES)
+        try:
+            operation_name = response["name"]
+        except KeyError:
+            raise AirflowException(
+                "Wrong response '{}' returned - it should contain "
+                "'name' field".format(response))
+        return self._wait_for_operation_to_complete(project_id, 
operation_name, zone)
+
+    def _wait_for_operation_to_complete(self, project_id, operation_name, 
zone=None):
+        """
+        Waits for the named operation to complete - checks status of the async 
call.
 
         :param operation_name: name of the operation
         :type operation_name: str
+        :param zone: optional region of the request (might be None for global 
operations)
+        :type zone: str
         :return: True if the operation succeeded, raises an error otherwise
         :rtype: bool
         """
         service = self.get_conn()
         while True:
-            operation_response = self._check_operation_status(
-                service, operation_name, project_id, zone)
+            if zone is None:
+                # noinspection PyTypeChecker
+                operation_response = self._check_global_operation_status(
+                    service, operation_name, project_id)
+            else:
+                # noinspection PyTypeChecker
+                operation_response = self._check_zone_operation_status(
+                    service, operation_name, project_id, zone)
             if operation_response.get("status") == GceOperationStatus.DONE:
                 error = operation_response.get("error")
                 if error:
@@ -161,7 +294,14 @@ def _wait_for_operation_to_complete(self, project_id, 
zone, operation_name):
                 return True
             time.sleep(TIME_TO_SLEEP_IN_SECONDS)
 
-    def _check_operation_status(self, service, operation_name, project_id, 
zone):
+    @staticmethod
+    def _check_zone_operation_status(service, operation_name, project_id, 
zone):
         return service.zoneOperations().get(
             project=project_id, zone=zone, operation=operation_name).execute(
             num_retries=NUM_RETRIES)
+
+    @staticmethod
+    def _check_global_operation_status(service, operation_name, project_id):
+        return service.globalOperations().get(
+            project=project_id, operation=operation_name).execute(
+            num_retries=NUM_RETRIES)
diff --git a/airflow/contrib/hooks/gcp_function_hook.py 
b/airflow/contrib/hooks/gcp_function_hook.py
index d89b5b0ec8..29cef1716c 100644
--- a/airflow/contrib/hooks/gcp_function_hook.py
+++ b/airflow/contrib/hooks/gcp_function_hook.py
@@ -65,7 +65,7 @@ def get_function(self, name):
 
         :param name: name of the function
         :type name: str
-        :return: a CloudFunction object representing the function
+        :return: a Cloud Functions object representing the function
         :rtype: dict
         """
         return self.get_conn().projects().locations().functions().get(
@@ -78,7 +78,7 @@ def list_functions(self, full_location):
         :param full_location: full location including the project in the form 
of
             of /projects/<PROJECT>/location/<LOCATION>
         :type full_location: str
-        :return: array of CloudFunction objects - representing functions in 
the location
+        :return: array of Cloud Functions objects - representing functions in 
the location
         :rtype: [dict]
         """
         list_response = 
self.get_conn().projects().locations().functions().list(
diff --git a/airflow/contrib/operators/gcp_compute_operator.py 
b/airflow/contrib/operators/gcp_compute_operator.py
index a2fd545294..a872c17227 100644
--- a/airflow/contrib/operators/gcp_compute_operator.py
+++ b/airflow/contrib/operators/gcp_compute_operator.py
@@ -16,18 +16,24 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from copy import deepcopy
+
+from googleapiclient.errors import HttpError
 
 from airflow import AirflowException
 from airflow.contrib.hooks.gcp_compute_hook import GceHook
+from airflow.contrib.utils.gcp_field_sanitizer import GcpBodyFieldSanitizer
 from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
+from json_merge_patch import merge
 
 
 class GceBaseOperator(BaseOperator):
     """
     Abstract base operator for Google Compute Engine operators to inherit from.
     """
+
     @apply_defaults
     def __init__(self,
                  project_id,
@@ -61,10 +67,10 @@ def execute(self, context):
 
 class GceInstanceStartOperator(GceBaseOperator):
     """
-    Start an instance in Google Compute Engine.
+    Starts an instance in Google Compute Engine.
 
-    :param project_id: Google Cloud Platform project where the Compute Engine
-        instance exists.
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        Instance exists.
     :type project_id: str
     :param zone: Google Cloud Platform zone where the instance exists.
     :type zone: str
@@ -72,10 +78,12 @@ class GceInstanceStartOperator(GceBaseOperator):
     :type resource_id: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (for example v1 or beta).
     :type api_version: str
     """
+    # [START gce_instance_start_template_fields]
     template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 
'api_version')
+    # [END gce_instance_start_template_fields]
 
     @apply_defaults
     def __init__(self,
@@ -95,10 +103,10 @@ def execute(self, context):
 
 class GceInstanceStopOperator(GceBaseOperator):
     """
-    Stop an instance in Google Compute Engine.
+    Stops an instance in Google Compute Engine.
 
-    :param project_id: Google Cloud Platform project where the Compute Engine
-        instance exists.
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        Instance exists.
     :type project_id: str
     :param zone: Google Cloud Platform zone where the instance exists.
     :type zone: str
@@ -106,10 +114,12 @@ class GceInstanceStopOperator(GceBaseOperator):
     :type resource_id: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (for example v1 or beta).
     :type api_version: str
     """
+    # [START gce_instance_stop_template_fields]
     template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 
'api_version')
+    # [END gce_instance_stop_template_fields]
 
     @apply_defaults
     def __init__(self,
@@ -135,10 +145,10 @@ def execute(self, context):
 class GceSetMachineTypeOperator(GceBaseOperator):
     """
     Changes the machine type for a stopped instance to the machine type 
specified in
-    the request.
+        the request.
 
-    :param project_id: Google Cloud Platform project where the Compute Engine
-        instance exists.
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        Instance exists.
     :type project_id: str
     :param zone: Google Cloud Platform zone where the instance exists.
     :type zone: str
@@ -149,10 +159,14 @@ class GceSetMachineTypeOperator(GceBaseOperator):
     :type body: dict
     :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (for example v1 or beta).
     :type api_version: str
+    :param validate_body: If set to False, body validation is not performed.
+    :type validate_body: bool
     """
+    # [START gce_instance_set_machine_type_template_fields]
     template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 
'api_version')
+    # [END gce_instance_set_machine_type_template_fields]
 
     @apply_defaults
     def __init__(self,
@@ -181,3 +195,241 @@ def execute(self, context):
         self._validate_all_body_fields()
         return self._hook.set_machine_type(self.project_id, self.zone,
                                            self.resource_id, self.body)
+
+
+GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [
+    dict(name="name", regexp="^.+$"),
+    dict(name="description", optional=True),
+    dict(name="properties", type='dict', optional=True, fields=[
+        dict(name="description", optional=True),
+        dict(name="tags", optional=True, fields=[
+            dict(name="items", optional=True)
+        ]),
+        dict(name="machineType", optional=True),
+        dict(name="canIpForward", optional=True),
+        dict(name="networkInterfaces", optional=True),  # not validating deeper
+        dict(name="disks", optional=True),  # not validating the array deeper
+        dict(name="metadata", optional=True, fields=[
+            dict(name="fingerprint", optional=True),
+            dict(name="items", optional=True),
+            dict(name="kind", optional=True),
+        ]),
+        dict(name="serviceAccounts", optional=True),  # not validating deeper
+        dict(name="scheduling", optional=True, fields=[
+            dict(name="onHostMaintenance", optional=True),
+            dict(name="automaticRestart", optional=True),
+            dict(name="preemptible", optional=True),
+            dict(name="nodeAffinitites", optional=True),  # not validating 
deeper
+        ]),
+        dict(name="labels", optional=True),
+        dict(name="guestAccelerators", optional=True),  # not validating deeper
+        dict(name="minCpuPlatform", optional=True),
+    ]),
+]
+
+GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [
+    "kind",
+    "id",
+    "name",
+    "creationTimestamp",
+    "properties.disks.sha256",
+    "properties.disks.kind",
+    "properties.disks.sourceImageEncryptionKey.sha256",
+    "properties.disks.index",
+    "properties.disks.licenses",
+    "properties.networkInterfaces.kind",
+    "properties.networkInterfaces.accessConfigs.kind",
+    "properties.networkInterfaces.name",
+    "properties.metadata.kind",
+    "selfLink"
+]
+
+
+class GceInstanceTemplateCopyOperator(GceBaseOperator):
+    """
+    Copies the instance template, applying specified changes.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Template
+    :type resource_id: str
+    :param body_patch: Patch to the body of instanceTemplates object following 
rfc7386
+        PATCH semantics. The body_patch content follows
+        
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
+        Name field is required as we need to rename the template,
+        all the other fields are optional. It is important to follow PATCH 
semantics
+        - arrays are replaced fully, so if you need to update an array you 
should
+        provide the whole target array as patch element.
+    :type body_patch: dict
+    :param request_id: Optional, unique request_id that you might add to 
achieve
+        full idempotence (for example when client call times out repeating the 
request
+        with the same request id will not create a new instance template 
again).
+        It should be in UUID format as defined in RFC 4122.
+    :type request_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (for example v1 or beta).
+    :type api_version: str
+    :param validate_body: If set to False, body validation is not performed.
+    :type validate_body: bool
+    """
+    # [START gce_instance_template_copy_operator_template_fields]
+    template_fields = ('project_id', 'resource_id', 'request_id',
+                       'gcp_conn_id', 'api_version')
+    # [END gce_instance_template_copy_operator_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 resource_id,
+                 body_patch,
+                 request_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body_patch = body_patch
+        self.request_id = request_id
+        self._field_validator = None
+        if 'name' not in self.body_patch:
+            raise AirflowException("The body '{}' should contain at least "
+                                   "name for the new operator in the 'name' 
field".
+                                   format(body_patch))
+        if validate_body:
+            self._field_validator = GcpBodyFieldValidator(
+                GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, 
api_version=api_version)
+        self._field_sanitizer = GcpBodyFieldSanitizer(
+            GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE)
+        super(GceInstanceTemplateCopyOperator, self).__init__(
+            project_id=project_id, zone='global', resource_id=resource_id,
+            gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
+
+    def _validate_all_body_fields(self):
+        if self._field_validator:
+            self._field_validator.validate(self.body_patch)
+
+    def execute(self, context):
+        self._validate_all_body_fields()
+        try:
+            # Idempotence check (sort of) - we want to check if the new 
template
+            # is already created and if is, then we assume it was created by 
previous run
+            # of CopyTemplate operator - we do not check if content of the 
template
+            # is as expected. Templates are immutable so we cannot update it 
anyway
+            # and deleting/recreating is not worth the hassle especially
+            # that we cannot delete template if it is already used in some 
Instance
+            # Group Manager. We assume success if the template is simply 
present
+            existing_template = self._hook.get_instance_template(
+                project_id=self.project_id, 
resource_id=self.body_patch['name'])
+            self.log.info("The {} template already existed. It was likely "
+                          "created by previous run of the operator. Assuming 
success.")
+            return existing_template
+        except HttpError as e:
+            # We actually expect to get 404 / Not Found here as the template 
should
+            # not yet exist
+            if not e.resp.status == 404:
+                raise e
+        old_body = self._hook.get_instance_template(project_id=self.project_id,
+                                                    
resource_id=self.resource_id)
+        new_body = deepcopy(old_body)
+        self._field_sanitizer.sanitize(new_body)
+        new_body = merge(new_body, self.body_patch)
+        self.log.info("Calling insert instance template with updated body: {}".
+                      format(new_body))
+        self._hook.insert_instance_template(project_id=self.project_id,
+                                            body=new_body,
+                                            request_id=self.request_id)
+        return self._hook.get_instance_template(project_id=self.project_id,
+                                                
resource_id=self.body_patch['name'])
+
+
+class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator):
+    """
+    Patches the Instance Group Manager, replacing source template URL with the
+    destination one. API V1 does not have update/patch operations for Instance
+    Group Manager, so you must use beta or newer API version. Beta is the 
default.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        Instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Group Manager
+    :type resource_id: str
+    :param zone: Google Cloud Platform zone where the Instance Group Manager 
exists.
+    :type zone: str
+    :param request_id: Optional, unique request_id that you might add to 
achieve
+        full idempotence (for example when client call times out repeating the 
request
+        with the same request id will not create a new instance template 
again).
+        It should be in UUID format as defined in RFC 4122
+    :type request_id: str
+    :param update_policy: The update policy for this managed instance group. 
See
+        
https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/patch
+        for details of the updatePolicy fields. It's an optional field.
+    :type dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (for example beta).
+    :type api_version: str
+    """
+    # [START gce_igm_update_template_operator_template_fields]
+    template_fields = ('project_id', 'resource_id', 'zone', 'request_id',
+                       'source_template', 'destination_template',
+                       'gcp_conn_id', 'api_version')
+    # [END gce_igm_update_template_operator_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 resource_id,
+                 zone,
+                 source_template,
+                 destination_template,
+                 update_policy=None,
+                 request_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='beta',
+                 *args, **kwargs):
+        self.zone = zone
+        self.source_template = source_template
+        self.destination_template = destination_template
+        self.request_id = request_id
+        self.update_policy = update_policy
+        self._change_performed = False
+        if api_version == 'v1':
+            raise AirflowException("Api version v1 does not have update/patch "
+                                   "operations for Instance Group Managers. 
Use beta"
+                                   " api version or above")
+        super(GceInstanceGroupManagerUpdateTemplateOperator, self).__init__(
+            project_id=project_id, zone=self.zone, resource_id=resource_id,
+            gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
+
+    def _possibly_replace_template(self, dictionary):
+        # type: (dict) -> None
+        if dictionary.get('instanceTemplate') == self.source_template:
+            dictionary['instanceTemplate'] = self.destination_template
+            self._change_performed = True
+
+    def execute(self, context):
+        old_instance_group_manager = self._hook.get_instance_group_manager(
+            project_id=self.project_id,
+            zone=self.zone,
+            resource_id=self.resource_id)
+        patch_body = {}
+        if 'versions' in old_instance_group_manager:
+            patch_body['versions'] = old_instance_group_manager['versions']
+        if 'instanceTemplate' in old_instance_group_manager:
+            patch_body['instanceTemplate'] = 
old_instance_group_manager['instanceTemplate']
+        if self.update_policy:
+            patch_body['updatePolicy'] = self.update_policy
+        self._possibly_replace_template(patch_body)
+        if 'versions' in patch_body:
+            for version in patch_body['versions']:
+                self._possibly_replace_template(version)
+        if self._change_performed or self.update_policy:
+            self.log.info("Calling patch instance template with updated body: 
{}".
+                          format(patch_body))
+            return self._hook.patch_instance_group_manager(
+                project_id=self.project_id, zone=self.zone, 
resource_id=self.resource_id,
+                body=patch_body, request_id=self.request_id)
+        else:
+            # Idempotence achieved
+            return True
diff --git a/airflow/contrib/operators/gcp_function_operator.py 
b/airflow/contrib/operators/gcp_function_operator.py
index c0013aaea9..7f7da1d3ec 100644
--- a/airflow/contrib/operators/gcp_function_operator.py
+++ b/airflow/contrib/operators/gcp_function_operator.py
@@ -93,7 +93,7 @@ class GcfFunctionDeployOperator(BaseOperator):
         . Different API versions require different variants of the Cloud 
Functions
         dictionary.
     :type body: dict or google.cloud.functions.v1.CloudFunction
-    :param gcp_conn_id: The connection ID to use to connect to Google Cloud 
Platform.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
     :type gcp_conn_id: str
     :param api_version: API version used (for example v1 or v1beta1).
     :type api_version: str
@@ -105,6 +105,9 @@ class GcfFunctionDeployOperator(BaseOperator):
     :param validate_body: If set to False, body validation is not performed.
     :type validate_body: bool
     """
+    # [START gce_function_deploy_template_operator_template_fields]
+    template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')
+    # [END gce_function_deploy_template_operator_template_fields]
 
     @apply_defaults
     def __init__(self,
@@ -276,6 +279,9 @@ class GcfFunctionDeleteOperator(BaseOperator):
     :param api_version: API version used (for example v1 or v1beta1).
     :type api_version: str
     """
+    # [START gce_function_delete_template_operator_template_fields]
+    template_fields = ('name', 'gcp_conn_id', 'api_version')
+    # [END gce_function_delete_template_operator_template_fields]
 
     @apply_defaults
     def __init__(self,
diff --git a/airflow/contrib/utils/gcp_field_sanitizer.py 
b/airflow/contrib/utils/gcp_field_sanitizer.py
new file mode 100644
index 0000000000..c0a8985281
--- /dev/null
+++ b/airflow/contrib/utils/gcp_field_sanitizer.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Sanitizer for body fields sent via GCP API.
+
+The sanitizer removes fields specified from the body.
+
+Context
+-------
+In some cases where GCP operation requires modification of existing resources 
(such
+as instances or instance templates) we need to sanitize body of the resources 
returned
+via GCP APIs. This is in the case when we retrieve information from GCP first,
+modify the body and either update the existing resource or create a new one 
with the
+modified body. Usually when you retrieve resource from GCP you get some extra 
fields which
+are Output-only, and we need to delete those fields if we want to use
+the body as input for subsequent create/insert type operation.
+
+
+Field specification
+-------------------
+
+Specification of fields is an array of strings which denote names of fields to 
be removed.
+The field can be either direct field name to remove from the body or the full
+specification of the path you should delete - separated with '.'
+
+
+>>> FIELDS_TO_SANITIZE = [
+>>>    "kind",
+>>>    "properties.disks.kind",
+>>>    "properties.metadata.kind",
+>>>]
+>>> body = {
+>>>     "kind": "compute#instanceTemplate",
+>>>     "name": "instance",
+>>>     "properties": {
+>>>         "disks": [
+>>>             {
+>>>                 "name": "a",
+>>>                 "kind": "compute#attachedDisk",
+>>>                 "type": "PERSISTENT",
+>>>                 "mode": "READ_WRITE",
+>>>             },
+>>>             {
+>>>                 "name": "b",
+>>>                 "kind": "compute#attachedDisk",
+>>>                 "type": "PERSISTENT",
+>>>                 "mode": "READ_WRITE",
+>>>             }
+>>>         ],
+>>>         "metadata": {
+>>>             "kind": "compute#metadata",
+>>>             "fingerprint": "GDPUYxlwHe4="
+>>>         },
+>>>     }
+>>> }
+>>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE)
+>>> SANITIZED_BODY = sanitizer.sanitize(body)
+>>> json.dumps(SANITIZED_BODY, indent=2)
+{
+    "name":  "instance",
+    "properties": {
+        "disks": [
+            {
+                "name": "a",
+                "type": "PERSISTENT",
+                "mode": "READ_WRITE",
+            },
+            {
+                "name": "b",
+                "type": "PERSISTENT",
+                "mode": "READ_WRITE",
+            }
+        ],
+        "metadata": {
+            "fingerprint": "GDPUYxlwHe4="
+        },
+    }
+}
+
+Note that the components of the path can be either dictionaries or arrays of 
dictionaries.
+In case  they are dictionaries, subsequent component names key of the field, 
in case of
+arrays - the sanitizer iterates through all dictionaries in the array and 
searches
+components in all elements of the array.
+"""
+
+from airflow import LoggingMixin, AirflowException
+
+
+class GcpFieldSanitizerException(AirflowException):
+    """Thrown when sanitizer finds unexpected field type in the path
+    (other than dict or array).
+    """
+
+    def __init__(self, message):
+        super(GcpFieldSanitizerException, self).__init__(message)
+
+
+class GcpBodyFieldSanitizer(LoggingMixin):
+    """Sanitizes the body according to specification.
+
+    :param sanitize_specs: array of strings that specifies which fields to 
remove
+    :type sanitize_specs: [string]
+
+    """
+    def __init__(self, sanitize_specs):
+        # type: ([str]) -> None
+        super(GcpBodyFieldSanitizer, self).__init__()
+        self._sanitize_specs = sanitize_specs
+
+    def _sanitize(self, dictionary, remaining_field_spec, current_path):
+        field_split = remaining_field_spec.split(".", 1)
+        if len(field_split) == 1:
+            field_name = field_split[0]
+            if field_name in dictionary:
+                self.log.info("Deleted {} [{}]".format(field_name, 
current_path))
+                del dictionary[field_name]
+            else:
+                self.log.debug("The field {} is missing in {} at the path {}.".
+                               format(field_name, dictionary, current_path))
+        else:
+            field_name = field_split[0]
+            remaining_path = field_split[1]
+            child = dictionary.get(field_name)
+            if child is None:
+                self.log.debug("The field {} is missing in {} at the path {}. 
".
+                               format(field_name, dictionary, current_path))
+            elif isinstance(child, dict):
+                self._sanitize(child, remaining_path, "{}.{}".format(
+                    current_path, field_name))
+            elif isinstance(child, list):
+                for index, elem in enumerate(child):
+                    if not isinstance(elem, dict):
+                        self.log.warn(
+                            "The field {} element at index {} is of wrong 
type. "
+                            "It should be dict and is {}. Skipping it.".
+                            format(current_path, index, elem))
+                    self._sanitize(elem, remaining_path, "{}.{}[{}]".format(
+                        current_path, field_name, index))
+            else:
+                self.log.warn(
+                    "The field {} is of wrong type. "
+                    "It should be dict or list and it is {}. Skipping it.".
+                    format(current_path, child))
+
+    def sanitize(self, body):
+        for elem in self._sanitize_specs:
+            self._sanitize(body, elem, "")
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 6333e32dd7..806eaa99d9 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -86,8 +86,8 @@ template variables <macros>` and a ``templates_dict`` 
argument.
 The ``templates_dict`` argument is templated, so each value in the dictionary
 is evaluated as a :ref:`Jinja template <jinja-templating>`.
 
-Google Cloud Platform Operators
--------------------------------
+Google Cloud Storage Operators
+------------------------------
 
 GoogleCloudStorageToBigQueryOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -102,22 +102,31 @@ to execute a BigQuery load job.
     :start-after: [START howto_operator_gcs_to_bq]
     :end-before: [END howto_operator_gcs_to_bq]
 
+
+Google Compute Engine Operators
+-------------------------------
+
 GceInstanceStartOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^
 
-Allows to start an existing Google Compute Engine instance.
+Use the
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator`
+to start an existing Google Compute Engine instance.
 
-In this example parameter values are extracted from Airflow variables.
-Moreover, the ``default_args`` dict is used to pass common arguments to all 
operators in a single DAG.
+
+Arguments
+"""""""""
+
+The following examples of OS environment variables show how you can build 
function name
+to use in the operator and build default args to pass them to multiple tasks:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
-    :start-after: [START howto_operator_gce_args]
-    :end-before: [END howto_operator_gce_args]
+    :start-after: [START howto_operator_gce_args_common]
+    :end-before: [END howto_operator_gce_args_common]
 
-
-Define the :class:`~airflow.contrib.operators.gcp_compute_operator
-.GceInstanceStartOperator` by passing the required arguments to the 
constructor.
+Using the operator
+""""""""""""""""""
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
@@ -125,15 +134,42 @@ Define the 
:class:`~airflow.contrib.operators.gcp_compute_operator
     :start-after: [START howto_operator_gce_start]
     :end-before: [END howto_operator_gce_start]
 
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_instance_start_template_fields]
+    :end-before: [END gce_instance_start_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Compute Engine API documentation 
<https://cloud.google.com/compute/docs/reference/rest/v1/instances/start>`_
+
+
 GceInstanceStopOperator
 ^^^^^^^^^^^^^^^^^^^^^^^
 
-Allows to stop an existing Google Compute Engine instance.
+Use the operator to stop Google Compute Engine instance.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator`
 
-For parameter definition take a look at 
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator`
 above.
+Arguments
+"""""""""
 
-Define the :class:`~airflow.contrib.operators.gcp_compute_operator
-.GceInstanceStopOperator` by passing the required arguments to the constructor.
+The following examples of OS environment variables show how you can build 
function name
+to use in the operator and build default args to pass them to multiple tasks:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
+   :language: python
+   :start-after: [START howto_operator_gce_args_common]
+   :end-before: [END howto_operator_gce_args_common]
+
+Using the operator
+""""""""""""""""""
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
@@ -141,15 +177,48 @@ Define the 
:class:`~airflow.contrib.operators.gcp_compute_operator
     :start-after: [START howto_operator_gce_stop]
     :end-before: [END howto_operator_gce_stop]
 
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_instance_stop_template_fields]
+    :end-before: [END gce_instance_stop_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Compute Engine API documentation 
<https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop>`_
+
+
 GceSetMachineTypeOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Allows to change the machine type for a stopped instance to the specified 
machine type.
+Use the operator to change machine type of a Google Compute Engine instance.
 
-For parameter definition take a look at 
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator`
 above.
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator`
 
-Define the :class:`~airflow.contrib.operators.gcp_compute_operator
-.GceSetMachineTypeOperator` by passing the required arguments to the 
constructor.
+Arguments
+"""""""""
+
+The following examples of OS environment variables show how you can build 
function name
+to use in the operator and build default args to pass them to multiple tasks:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
+    :language: python
+    :start-after: [START howto_operator_gce_args_common]
+    :end-before: [END howto_operator_gce_args_common]
+
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
+    :language: python
+    :start-after: [START howto_operator_gce_args_set_machine_type]
+    :end-before: [END howto_operator_gce_args_set_machine_type]
+
+Using the operator
+""""""""""""""""""
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
@@ -157,26 +226,163 @@ Define the 
:class:`~airflow.contrib.operators.gcp_compute_operator
     :start-after: [START howto_operator_gce_set_machine_type]
     :end-before: [END howto_operator_gce_set_machine_type]
 
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_instance_set_machine_type_template_fields]
+    :end-before: [END gce_instance_set_machine_type_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Compute Engine API documentation 
<https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType>`_
+
+
+GceInstanceTemplateCopyOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the operator to copy an existing Google Compute Engine instance template
+applying a patch to it.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator`.
+
+Arguments
+"""""""""
+
+The following examples of OS environment variables show how you can build 
parameters
+passed to the operator and build default args to pass them to multiple tasks:
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :start-after: [START howto_operator_compute_igm_common_args]
+    :end-before: [END howto_operator_compute_igm_common_args]
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :start-after: [START howto_operator_compute_template_copy_args]
+    :end-before: [END howto_operator_compute_template_copy_args]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gce_igm_copy_template]
+    :end-before: [END howto_operator_gce_igm_copy_template]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_instance_template_copy_operator_template_fields]
+    :end-before: [END gce_instance_template_copy_operator_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Compute Engine API documentation 
<https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates>`_
+
+GceInstanceGroupManagerUpdateTemplateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the operator to update template in Google Compute Engine Instance Group 
Manager.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator`.
+
+Arguments
+"""""""""
+
+The following examples of OS environment variables show how you can build 
parameters
+passed to the operator and build default args to pass them to multiple tasks:
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :start-after: [START howto_operator_compute_igm_common_args]
+    :end-before: [END howto_operator_compute_igm_common_args]
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :start-after: [START howto_operator_compute_igm_update_template_args]
+    :end-before: [END howto_operator_compute_igm_update_template_args]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_compute_igm.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gce_igm_update_template]
+    :end-before: [END howto_operator_gce_igm_update_template]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_igm_update_template_operator_template_fields]
+    :end-before: [END gce_igm_update_template_operator_template_fields]
+
+Troubleshooting
+"""""""""""""""
+
+You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails 
with
+missing permissions. The service account has to have Service Account User role 
assigned
+via IAM permissions in order to execute the operation.
+
+More information
+""""""""""""""""
+
+See `Google Compute Engine API documentation 
<https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_
+
+Google Cloud Functions Operators
+--------------------------------
 
 GcfFunctionDeleteOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Use the ``default_args`` dict to pass arguments to the operator.
+Use the operator to delete a function from Google Cloud Functions.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`.
+
+Arguments
+"""""""""
+
+The following examples of OS environment variables show how you can build 
function name
+to use in the operator and build default args to pass them to multiple tasks:
 
 .. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_function_delete.py
     :language: python
     :start-after: [START howto_operator_gcf_delete_args]
     :end-before: [END howto_operator_gcf_delete_args]
 
-
-Use the 
:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`
-to delete a function from Google Cloud Functions.
+Using the operator
+""""""""""""""""""
 
 .. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_function_delete.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_gcf_delete]
     :end-before: [END howto_operator_gcf_delete]
 
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_function_delete_template_operator_template_fields]
+    :end-before: [END gce_function_delete_template_operator_template_fields]
+
 Troubleshooting
 """""""""""""""
 If you want to run or deploy an operator using a service account and get 
“forbidden 403”
@@ -191,7 +397,6 @@ The typical way of assigning Cloud IAM permissions with 
`gcloud` is
 shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform 
project
 and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
 
-
 .. code-block:: bash
 
   gcloud iam service-accounts add-iam-policy-binding \
@@ -202,13 +407,24 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your 
service account.
 
 See `Adding the IAM service agent user role to the runtime service 
<https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_
  for details
 
+More information
+""""""""""""""""
+
+See `Google Cloud Functions API documentation 
<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/delete>`_
+
 GcfFunctionDeployOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Use the 
:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`
-to deploy a function from Google Cloud Functions.
+Use the operator to deploy a function to Google Cloud Functions.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`.
+
 
-The following examples of Airflow variables show various variants and 
combinations
+Arguments
+"""""""""
+
+The following examples of OS environment variables show various variants and 
combinations
 of default_args that you can use. The variables are defined as follows:
 
 .. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@@ -223,11 +439,12 @@ With those variables you can define the body of the 
request:
     :start-after: [START howto_operator_gcf_deploy_body]
     :end-before: [END howto_operator_gcf_deploy_body]
 
-When you create a DAG, the default_args dictionary can be used to pass the 
body and
-other arguments:
+When you create a DAG, the default_args dictionary can be used to pass
+arguments common with other tasks:
 
 .. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_gcf_deploy_args]
     :end-before: [END howto_operator_gcf_deploy_args]
 
@@ -235,11 +452,14 @@ Note that the neither the body nor the default args are 
complete in the above ex
 Depending on the set variables, there might be different variants on how to 
pass source
 code related fields. Currently, you can pass either sourceArchiveUrl, 
sourceRepository
 or sourceUploadUrl as described in the
-`CloudFunction API specification 
<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
+`Cloud Functions API specification 
<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
 Additionally, default_args might contain zip_path parameter to run the extra 
step of
 uploading the source code before deploying it. In the last case, you also need 
to
 provide an empty `sourceUploadUrl` parameter in the body.
 
+Using the operator
+""""""""""""""""""
+
 Based on the variables defined above, example logic of setting the source code
 related fields is shown here:
 
@@ -252,9 +472,20 @@ The code to create the operator:
 
 .. literalinclude:: 
../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_gcf_deploy]
     :end-before: [END howto_operator_gcf_deploy]
 
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gce_function_deploy_template_operator_template_fields]
+    :end-before: [END gce_function_deploy_template_operator_template_fields]
+
+
 Troubleshooting
 """""""""""""""
 
@@ -277,13 +508,20 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your 
service account.
     --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
     --role="roles/iam.serviceAccountUser"
 
-
 See `Adding the IAM service agent user role to the runtime service 
<https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_
  for details
 
 If the source code for your function is in Google Source Repository, make sure 
that
 your service account has the Source Repository Viewer role so that the source 
code
 can be downloaded if necessary.
 
+More information
+""""""""""""""""
+
+See `Google Cloud Functions API documentation 
<https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/create>`_
+
+Google Cloud Sql Operators
+--------------------------
+
 CloudSqlInstanceDatabaseCreateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/docs/integration.rst b/docs/integration.rst
index a0445409e7..46ea436bd7 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -100,28 +100,28 @@ field (see connection `wasb_default` for an example).
 .. _WasbBlobSensor:
 
 WasbBlobSensor
-"""""""""""""""
+""""""""""""""
 
 .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
 
 .. _WasbPrefixSensor:
 
 WasbPrefixSensor
-"""""""""""""""""
+""""""""""""""""
 
 .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor
 
 .. _FileToWasbOperator:
 
 FileToWasbOperator
-"""""""""""""""""""
+""""""""""""""""""
 
 .. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
 
 .. _WasbHook:
 
 WasbHook
-"""""""""
+""""""""
 
 .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
 
@@ -582,6 +582,16 @@ Compute Engine Operators
 - :ref:`GceInstanceStartOperator` : start an existing Google Compute Engine 
instance.
 - :ref:`GceInstanceStopOperator` : stop an existing Google Compute Engine 
instance.
 - :ref:`GceSetMachineTypeOperator` : change the machine type for a stopped 
instance.
+- :ref:`GceInstanceTemplateCopyOperator` : copy the Instance Template, applying
+  specified changes.
+- :ref:`GceInstanceGroupManagerUpdateTemplateOperator` : patch the Instance 
Group Manager,
+  replacing source Instance Template URL with the destination one.
+
+The operators have common base operator:
+
+.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
+
+They also use :ref:`GceHook` hook to communicate with Google Cloud Platform.
 
 .. _GceInstanceStartOperator:
 
@@ -604,6 +614,28 @@ GceSetMachineTypeOperator
 
 .. autoclass:: 
airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator
 
+.. _GceInstanceTemplateCopyOperator:
+
+GceInstanceTemplateCopyOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator
+
+.. _GceInstanceGroupManagerUpdateTemplateOperator:
+
+GceInstanceGroupManagerUpdateTemplateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator
+
+.. _GceHook:
+
+Compute Engine Hook
+"""""""""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.gcp_compute_hook.GceHook
+:members:
+
 
 Cloud Functions
 '''''''''''''''
@@ -616,6 +648,8 @@ Cloud Functions Operators
 
 .. autoclass:: airflow.contrib.operators.gcp_operator.GCP
 
+They also use :ref:`GcfHook` hook to communicate with Google Cloud Platform.
+
 .. _GcfFunctionDeployOperator:
 
 GcfFunctionDeployOperator
@@ -632,6 +666,8 @@ GcfFunctionDeleteOperator
 .. autoclass:: 
airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator
 
 
+.. _GcfHook:
+
 Cloud Functions Hook
 """"""""""""""""""""
 
@@ -741,7 +777,7 @@ DataprocClusterCreateOperator
 .. _DataprocClusterScaleOperator:
 
 DataprocClusterScaleOperator
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: 
airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator
 
diff --git a/setup.py b/setup.py
index d093d4971f..b337cd5e22 100644
--- a/setup.py
+++ b/setup.py
@@ -309,6 +309,7 @@ def do_setup():
             'gitpython>=2.0.2',
             'gunicorn>=19.4.0, <20.0',
             'iso8601>=0.1.12',
+            'json-merge-patch==0.2',
             'jinja2>=2.7.3, <2.9.0',
             'lxml>=4.0.0',
             'markdown>=2.5.2, <3.0',
diff --git a/tests/contrib/operators/test_gcp_compute_operator.py 
b/tests/contrib/operators/test_gcp_compute_operator.py
index 449c4e015f..e8f9bf0165 100644
--- a/tests/contrib/operators/test_gcp_compute_operator.py
+++ b/tests/contrib/operators/test_gcp_compute_operator.py
@@ -18,10 +18,15 @@
 # under the License.
 import ast
 import unittest
+from copy import deepcopy
+
+import httplib2
+from googleapiclient.errors import HttpError
 
 from airflow import AirflowException, configuration
 from airflow.contrib.operators.gcp_compute_operator import 
GceInstanceStartOperator, \
-    GceInstanceStopOperator, GceSetMachineTypeOperator
+    GceInstanceStopOperator, GceSetMachineTypeOperator, 
GceInstanceTemplateCopyOperator, \
+    GceInstanceGroupManagerUpdateTemplateOperator
 from airflow.models import TaskInstance, DAG
 from airflow.utils import timezone
 
@@ -34,12 +39,14 @@
     except ImportError:
         mock = None
 
+EMPTY_CONTENT = ''.encode('utf8')
+
 PROJECT_ID = 'project-id'
-LOCATION = 'zone'
+ZONE = 'zone'
 RESOURCE_ID = 'resource-id'
 SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
 SET_MACHINE_TYPE_BODY = {
-    'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, 
SHORT_MACHINE_TYPE_NAME)
+    'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, 
SHORT_MACHINE_TYPE_NAME)
 }
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
@@ -51,7 +58,7 @@ def test_instance_start(self, mock_hook):
         mock_hook.return_value.start_instance.return_value = True
         op = GceInstanceStartOperator(
             project_id=PROJECT_ID,
-            zone=LOCATION,
+            zone=ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -59,11 +66,11 @@ def test_instance_start(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.start_instance.assert_called_once_with(
-            PROJECT_ID, LOCATION, RESOURCE_ID
+            PROJECT_ID, ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
-    # Setting all of the operator's input parameters as templated dag_ids
+    # Setting all of the operator's input parameters as template dag_ids
     # (could be anything else) just to test if the templating works for all 
fields
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
     def test_instance_start_with_templates(self, mock_hook):
@@ -95,7 +102,7 @@ def test_start_should_throw_ex_when_missing_project_id(self, 
mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
                 project_id="",
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -123,7 +130,7 @@ def 
test_start_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStartOperator(
                 project_id=PROJECT_ID,
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -132,12 +139,14 @@ def 
test_start_should_throw_ex_when_missing_resource_id(self, mock_hook):
         self.assertIn("The required parameter 'resource_id' is missing", 
str(err))
         mock_hook.assert_not_called()
 
+
+class GceInstanceStopTest(unittest.TestCase):
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
     def test_instance_stop(self, mock_hook):
         mock_hook.return_value.stop_instance.return_value = True
         op = GceInstanceStopOperator(
             project_id=PROJECT_ID,
-            zone=LOCATION,
+            zone=ZONE,
             resource_id=RESOURCE_ID,
             task_id='id'
         )
@@ -145,7 +154,7 @@ def test_instance_stop(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.stop_instance.assert_called_once_with(
-            PROJECT_ID, LOCATION, RESOURCE_ID
+            PROJECT_ID, ZONE, RESOURCE_ID
         )
         self.assertTrue(result)
 
@@ -181,7 +190,7 @@ def test_stop_should_throw_ex_when_missing_project_id(self, 
mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
                 project_id="",
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id=RESOURCE_ID,
                 task_id='id'
             )
@@ -209,7 +218,7 @@ def 
test_stop_should_throw_ex_when_missing_resource_id(self, mock_hook):
         with self.assertRaises(AirflowException) as cm:
             op = GceInstanceStopOperator(
                 project_id=PROJECT_ID,
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id="",
                 task_id='id'
             )
@@ -218,12 +227,14 @@ def 
test_stop_should_throw_ex_when_missing_resource_id(self, mock_hook):
         self.assertIn("The required parameter 'resource_id' is missing", 
str(err))
         mock_hook.assert_not_called()
 
+
+class GceInstanceSetMachineTypeTest(unittest.TestCase):
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
     def test_set_machine_type(self, mock_hook):
         mock_hook.return_value.set_machine_type.return_value = True
         op = GceSetMachineTypeOperator(
             project_id=PROJECT_ID,
-            zone=LOCATION,
+            zone=ZONE,
             resource_id=RESOURCE_ID,
             body=SET_MACHINE_TYPE_BODY,
             task_id='id'
@@ -232,7 +243,7 @@ def test_set_machine_type(self, mock_hook):
         mock_hook.assert_called_once_with(api_version='v1',
                                           gcp_conn_id='google_cloud_default')
         mock_hook.return_value.set_machine_type.assert_called_once_with(
-            PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY
+            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
         )
         self.assertTrue(result)
 
@@ -269,7 +280,7 @@ def 
test_set_machine_type_should_throw_ex_when_missing_project_id(self, mock_hoo
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id="",
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -299,7 +310,7 @@ def 
test_set_machine_type_should_throw_ex_when_missing_resource_id(self, mock_ho
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id=PROJECT_ID,
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id="",
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
@@ -314,7 +325,7 @@ def 
test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_h
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id=PROJECT_ID,
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id=RESOURCE_ID,
                 body={},
                 task_id='id'
@@ -332,10 +343,10 @@ def 
test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_h
                        "'zone': 
'https://www.googleapis.com/compute/v1/projects/polidea"; \
                        "-airflow/zones/europe-west3-b', 'operationType': " \
                        "'setMachineType', 'targetLink': " \
-                       
"'https://www.googleapis.com/compute/v1/projects/polidea-airflow"; \
+                       
"'https://www.googleapis.com/compute/v1/projects/example-airflow"; \
                        "/zones/europe-west3-b/instances/pa-1', 'targetId': " \
                        "'2480086944131075860', 'status': 'DONE', 'user': " \
-                       "'[email protected]', " 
\
+                       "'[email protected]', " 
\
                        "'progress': 100, 'insertTime': 
'2018-10-03T07:50:07.951-07:00', "\
                        "'startTime': '2018-10-03T07:50:08.324-07:00', 
'endTime': " \
                        "'2018-10-03T07:50:08.484-07:00', 'error': {'errors': 
[{'code': " \
@@ -343,35 +354,688 @@ def 
test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_h
                        "'machine-type-1' does not exist in zone 
'europe-west3-b'.\"}]}, "\
                        "'httpErrorStatusCode': 400, 'httpErrorMessage': 'BAD 
REQUEST', " \
                        "'selfLink': " \
-                       
"'https://www.googleapis.com/compute/v1/projects/polidea-airflow"; \
+                       
"'https://www.googleapis.com/compute/v1/projects/example-airflow"; \
                        
"/zones/europe-west3-b/operations/operation-1538578207537" \
                        "-577542784f769-7999ab71-94f9ec1d'} "
 
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook'
-                '._check_operation_status')
+                '._check_zone_operation_status')
     @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook'
                 '._execute_set_machine_type')
     
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook.get_conn')
     def test_set_machine_type_should_handle_and_trim_gce_error(
-            self, get_conn, _execute_set_machine_type, 
_check_operation_status):
+            self, get_conn, _execute_set_machine_type, 
_check_zone_operation_status):
         get_conn.return_value = {}
         _execute_set_machine_type.return_value = {"name": "test-operation"}
-        _check_operation_status.return_value = 
ast.literal_eval(self.MOCK_OP_RESPONSE)
+        _check_zone_operation_status.return_value = 
ast.literal_eval(self.MOCK_OP_RESPONSE)
         with self.assertRaises(AirflowException) as cm:
             op = GceSetMachineTypeOperator(
                 project_id=PROJECT_ID,
-                zone=LOCATION,
+                zone=ZONE,
                 resource_id=RESOURCE_ID,
                 body=SET_MACHINE_TYPE_BODY,
                 task_id='id'
             )
             op.execute(None)
         err = cm.exception
-        _check_operation_status.assert_called_once_with(
-            {}, "test-operation", PROJECT_ID, LOCATION)
+        _check_zone_operation_status.assert_called_once_with(
+            {}, "test-operation", PROJECT_ID, ZONE)
         _execute_set_machine_type.assert_called_once_with(
-            PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
+            PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
         # Checking the full message was sometimes failing due to different 
order
         # of keys in the serialized JSON
         self.assertIn("400 BAD REQUEST: {", str(err))  # checking the square 
bracket trim
         self.assertIn("UNSUPPORTED_OPERATION", str(err))
+
+
+GCE_INSTANCE_TEMPLATE_NAME = "instance-template-test"
+GCE_INSTANCE_TEMPLATE_NEW_NAME = "instance-template-test-new"
+GCE_INSTANCE_TEMPLATE_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6"
+
+GCE_INSTANCE_TEMPLATE_BODY_GET = {
+    "kind": "compute#instanceTemplate",
+    "id": "6950321349997439715",
+    "creationTimestamp": "2018-10-15T06:20:12.777-07:00",
+    "name": GCE_INSTANCE_TEMPLATE_NAME,
+    "description": "",
+    "properties": {
+        "machineType": "n1-standard-1",
+        "networkInterfaces": [
+            {
+                "kind": "compute#networkInterface",
+                "network": "https://www.googleapis.com/compute/v1/";
+                           "projects/project/global/networks/default",
+                "accessConfigs": [
+                    {
+                        "kind": "compute#accessConfig",
+                        "type": "ONE_TO_ONE_NAT",
+                    }
+                ]
+            },
+            {
+                "network": "https://www.googleapis.com/compute/v1/";
+                           "projects/project/global/networks/default",
+                "accessConfigs": [
+                    {
+                        "kind": "compute#accessConfig",
+                        "networkTier": "PREMIUM"
+                    }
+                ]
+            }
+        ],
+        "disks": [
+            {
+                "kind": "compute#attachedDisk",
+                "type": "PERSISTENT",
+                "licenses": [
+                    "A String",
+                ]
+            }
+        ],
+        "metadata": {
+            "kind": "compute#metadata",
+            "fingerprint": "GDPUYxlwHe4="
+        },
+    },
+    "selfLink": "https://www.googleapis.com/compute/v1/projects/project";
+                "/global/instanceTemplates/instance-template-test"
+}
+
+GCE_INSTANCE_TEMPLATE_BODY_INSERT = {
+    "name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+    "description": "",
+    "properties": {
+        "machineType": "n1-standard-1",
+        "networkInterfaces": [
+            {
+                "network": "https://www.googleapis.com/compute/v1/";
+                           "projects/project/global/networks/default",
+                "accessConfigs": [
+                    {
+                        "type": "ONE_TO_ONE_NAT",
+                    }
+                ]
+            },
+            {
+                "network": "https://www.googleapis.com/compute/v1/";
+                           "projects/project/global/networks/default",
+                "accessConfigs": [
+                    {
+                        "networkTier": "PREMIUM"
+                    }
+                ]
+            }
+        ],
+        "disks": [
+            {
+                "type": "PERSISTENT",
+            }
+        ],
+        "metadata": {
+            "fingerprint": "GDPUYxlwHe4="
+        },
+    },
+}
+
+GCE_INSTANCE_TEMPLATE_BODY_GET_NEW = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_GET)
+GCE_INSTANCE_TEMPLATE_BODY_GET_NEW['name'] = GCE_INSTANCE_TEMPLATE_NEW_NAME
+
+
+class GceInstanceTemplateCopyTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
+            request_id=None
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_idempotent_copy_template_when_already_copied(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        mock_hook.return_value.insert_instance_template.assert_not_called()
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template_with_request_id(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
+            task_id='id',
+            body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
+            request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template_with_description_fields(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
+            task_id='id',
+            body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+                        "description": "New description"}
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+
+        body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
+        body_insert["description"] = "New description"
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=body_insert,
+            request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_copy_with_some_validation_warnings(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+                        "some_wrong_field": "test",
+                        "properties": {
+                            "some_other_wrong_field": "test"
+                        }}
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
+        body_insert["some_wrong_field"] = "test"
+        body_insert["properties"]["some_other_wrong_field"] = "test"
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=body_insert,
+            request_id=None,
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template_with_updated_nested_fields(self, 
mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={
+                "name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+                "properties": {
+                    "machineType": "n1-standard-2",
+                }
+            }
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
+        body_insert["properties"]["machineType"] = "n1-standard-2"
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=body_insert,
+            request_id=None
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template_with_smaller_array_fields(self, 
mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={
+                "name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+                "properties": {
+                    "machineType": "n1-standard-1",
+                    "networkInterfaces": [
+                        {
+                            "network": "https://www.googleapis.com/compute/v1/";
+                                       
"projects/project/global/networks/default",
+                            "accessConfigs": [
+                                {
+                                    "type": "ONE_TO_ONE_NAT",
+                                    "natIP": "8.8.8.8"
+                                }
+                            ]
+                        }
+                    ]
+                }
+            }
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+        body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
+        body_insert["properties"]["networkInterfaces"] = [
+            {
+                "network": "https://www.googleapis.com/compute/v1/";
+                           "projects/project/global/networks/default",
+                "accessConfigs": [
+                    {
+                        "type": "ONE_TO_ONE_NAT",
+                        "natIP": "8.8.8.8"
+                    }
+                ]
+            }
+        ]
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=body_insert,
+            request_id=None
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_copy_template_with_bigger_array_fields(self, 
mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        op = GceInstanceTemplateCopyOperator(
+            project_id=PROJECT_ID,
+            resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+            task_id='id',
+            body_patch={
+                "name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
+                "properties": {
+                    "disks": [
+                        {
+                            "kind": "compute#attachedDisk",
+                            "type": "SCRATCH",
+                            "licenses": [
+                                "Updated String",
+                            ]
+                        },
+                        {
+                            "kind": "compute#attachedDisk",
+                            "type": "PERSISTENT",
+                            "licenses": [
+                                "Another String",
+                            ]
+                        }
+                    ],
+                }
+            }
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='v1',
+                                          gcp_conn_id='google_cloud_default')
+
+        body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
+        body_insert["properties"]["disks"] = [
+            {
+                "kind": "compute#attachedDisk",
+                "type": "SCRATCH",
+                "licenses": [
+                    "Updated String",
+                ]
+            },
+            {
+                "kind": "compute#attachedDisk",
+                "type": "PERSISTENT",
+                "licenses": [
+                    "Another String",
+                ]
+            }
+        ]
+        
mock_hook.return_value.insert_instance_template.assert_called_once_with(
+            project_id=PROJECT_ID,
+            body=body_insert,
+            request_id=None,
+        )
+        self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_missing_name(self, mock_hook):
+        mock_hook.return_value.get_instance_template.side_effect = [
+            HttpError(resp=httplib2.Response({'status': 404}), 
content=EMPTY_CONTENT),
+            GCE_INSTANCE_TEMPLATE_BODY_GET,
+            GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
+        ]
+        with self.assertRaises(AirflowException) as cm:
+            op = GceInstanceTemplateCopyOperator(
+                project_id=PROJECT_ID,
+                resource_id=GCE_INSTANCE_TEMPLATE_NAME,
+                request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
+                task_id='id',
+                body_patch={"description": "New description"}
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("should contain at least name for the new operator "
+                      "in the 'name' field", str(err))
+        mock_hook.assert_not_called()
+
+
+GCE_INSTANCE_GROUP_MANAGER_NAME = "instance-group-test"
+GCE_INSTANCE_TEMPLATE_SOURCE_URL = \
+    "https://www.googleapis.com/compute/beta/projects/project"; \
+    "/global/instanceTemplates/instance-template-test"
+
+GCE_INSTANCE_TEMPLATE_OTHER_URL = \
+    "https://www.googleapis.com/compute/beta/projects/project"; \
+    "/global/instanceTemplates/instance-template-other"
+
+GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL = \
+    "https://www.googleapis.com/compute/beta/projects/project"; \
+    "/global/instanceTemplates/instance-template-non-existing"
+
+GCE_INSTANCE_TEMPLATE_DESTINATION_URL = \
+    "https://www.googleapis.com/compute/beta/projects/project"; \
+    "/global/instanceTemplates/instance-template-new"
+
+GCE_INSTANCE_GROUP_MANAGER_GET = {
+    "kind": "compute#instanceGroupManager",
+    "id": "2822359583810032488",
+    "creationTimestamp": "2018-10-17T05:39:35.793-07:00",
+    "name": GCE_INSTANCE_GROUP_MANAGER_NAME,
+    "zone": 
"https://www.googleapis.com/compute/beta/projects/project/zones/zone";,
+    "instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+    "versions": [
+        {
+            "name": "v1",
+            "instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            "targetSize": {
+                "calculated": 1
+            }
+        },
+        {
+            "name": "v2",
+            "instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL,
+        }
+    ],
+    "instanceGroup": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+    "baseInstanceName": GCE_INSTANCE_GROUP_MANAGER_NAME,
+    "fingerprint": "BKWB_igCNbQ=",
+    "currentActions": {
+        "none": 1,
+        "creating": 0,
+        "creatingWithoutRetries": 0,
+        "verifying": 0,
+        "recreating": 0,
+        "deleting": 0,
+        "abandoning": 0,
+        "restarting": 0,
+        "refreshing": 0
+    },
+    "pendingActions": {
+        "creating": 0,
+        "deleting": 0,
+        "recreating": 0,
+        "restarting": 0
+    },
+    "targetSize": 1,
+    "selfLink": 
"https://www.googleapis.com/compute/beta/projects/project/zones/";
+                "zone/instanceGroupManagers/" + 
GCE_INSTANCE_GROUP_MANAGER_NAME,
+    "autoHealingPolicies": [
+        {
+            "initialDelaySec": 300
+        }
+    ],
+    "serviceAccount": "[email protected]"
+}
+
+GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH = {
+    "instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL,
+    "versions": [
+        {
+            "name": "v1",
+            "instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL,
+            "targetSize": {
+                "calculated": 1
+            }
+        },
+        {
+            "name": "v2",
+            "instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL,
+        }
+    ],
+}
+
+GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6"
+
+GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY = {
+    "type": "OPPORTUNISTIC",
+    "minimalAction": "RESTART",
+    "maxSurge": {
+        "fixed": 1
+    },
+    "maxUnavailable": {
+        "percent": 10
+    },
+    "minReadySec": 1800
+}
+
+
+class GceInstanceGroupManagerUpdateTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_instance_group_update(self, mock_hook):
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
+            request_id=None
+        )
+        self.assertTrue(result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_instance_group_update_no_instance_template_field(self, 
mock_hook):
+        instance_group_manager_no_template = 
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        del instance_group_manager_no_template['instanceTemplate']
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            instance_group_manager_no_template
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        expected_patch_no_instance_template = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
+        del expected_patch_no_instance_template['instanceTemplate']
+        
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            body=expected_patch_no_instance_template,
+            request_id=None
+        )
+        self.assertTrue(result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_instance_group_update_no_versions_field(self, 
mock_hook):
+        instance_group_manager_no_versions = 
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        del instance_group_manager_no_versions['versions']
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            instance_group_manager_no_versions
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        expected_patch_no_versions = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
+        del expected_patch_no_versions['versions']
+        
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            body=expected_patch_no_versions,
+            request_id=None
+        )
+        self.assertTrue(result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_instance_group_update_with_update_policy(self, 
mock_hook):
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            update_policy=GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY,
+            source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        expected_patch_with_update_policy = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
+        expected_patch_with_update_policy['updatePolicy'] = \
+            GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY
+        
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            body=expected_patch_with_update_policy,
+            request_id=None
+        )
+        self.assertTrue(result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_successful_instance_group_update_with_request_id(self, mock_hook):
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+            request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
+            request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID
+        )
+        self.assertTrue(result)
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_try_to_use_api_v1(self, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            GceInstanceGroupManagerUpdateTemplateOperator(
+                project_id=PROJECT_ID,
+                zone=ZONE,
+                resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+                task_id='id',
+                api_version='v1',
+                source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
+                destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+            )
+        err = cm.exception
+        self.assertIn("Use beta api version or above", str(err))
+
+    @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
+    def test_try_to_use_non_existing_template(self, mock_hook):
+        mock_hook.return_value.get_instance_group_manager.return_value = \
+            deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
+        op = GceInstanceGroupManagerUpdateTemplateOperator(
+            project_id=PROJECT_ID,
+            zone=ZONE,
+            resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
+            task_id='id',
+            source_template=GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL,
+            destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version='beta',
+                                          gcp_conn_id='google_cloud_default')
+        mock_hook.return_value.patch_instance_group_manager.assert_not_called()
+        self.assertTrue(result)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Implement Instance Group Managers operators for GCE
> ---------------------------------------------------
>
>                 Key: AIRFLOW-3220
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3220
>             Project: Apache Airflow
>          Issue Type: New Feature
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 2.0.0
>
>
> In order to be able to manage Instance Group Managers templates for instances:
> [https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers]
>  
> Two operators are needed:
>  * GceInstanceTemplateCopy - where existing template is copied with some 
> changes specified (using API from 
> https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates)
>  * GceInstanceGroupManagerUpdateVersion -  where updated template is replaced 
> in the instance group manager with patch method: 
> https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/patch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to