TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r503211300



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -503,3 +506,438 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(
+        self,
+    ):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore 
Memcached service.
+        """
+        if not self._client:
+            self._client = 
CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) 
-> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: 
google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of 
the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level 
parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to 
all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and 
apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for 
example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the 
customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the 
instance. If set
+            to None or missing, the default project_id from the Google Cloud 
connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is 
specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual 
attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, 
instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, 
timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),

Review comment:
       When I tried to run example DAG i got error:
   ```
     File 
"/usr/local/lib/python3.7/site-packages/google/cloud/memcache_v1beta2/services/cloud_memcache/client.py",
 line 385, in get_instance
       metadata = tuple(metadata) + (
   TypeError: 'NoneType' object is not iterable
   ```
   because default metadata in operator is `None`.
   IMHO for optional parameters with iterables it is good to have default value 
set to None and handle it later. It is more intuitive and allows to pass None 
or () by the user and avoid such situations. 
   This approach was useg e.g here: 
https://github.com/apache/airflow/blob/2bf7b7cac7858f5a6a495f1a9eb4780ec84f95b4/airflow/providers/amazon/aws/operators/emr_add_steps.py#L69
   
   ```suggestion
           metadata: Optional[Sequence[Tuple[str, str]]] = None,
           
           ...
           
           metadata = metadata or ()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to