TobKed commented on a change in pull request #10121:
URL: https://github.com/apache/airflow/pull/10121#discussion_r503299338



##########
File path: airflow/providers/google/cloud/hooks/cloud_memorystore.py
##########
@@ -499,3 +502,434 @@ def update_instance(
         )
         result.result()
         self.log.info("Instance updated: %s", instance.name)
+
+
+class CloudMemorystoreMemcachedHook(GoogleBaseHook):
+    """
+    Hook for Google Cloud Memorystore for Memcached service APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id, delegate_to=delegate_to, 
impersonation_chain=impersonation_chain,
+        )
+        self._client: Optional[CloudMemcacheClient] = None
+
+    def get_conn(self,):
+        """
+        Retrieves client library object that allow access to Cloud Memorystore 
Memcached service.
+        """
+        if not self._client:
+            self._client = 
CloudMemcacheClient(credentials=self._get_credentials())
+        return self._client
+
+    @staticmethod
+    def _append_label(instance: cloud_memcache.Instance, key: str, val: str) 
-> cloud_memcache.Instance:
+        """
+        Append labels to provided Instance type
+
+        Labels must fit the regex ``[a-z]([-a-z0-9]*[a-z0-9])?`` (current
+         airflow version string follows semantic versioning spec: x.y.z).
+
+        :param instance: The proto to append resource_label airflow
+            version to
+        :type instance: 
google.cloud.memcache_v1beta2.types.cloud_memcache.Instance
+        :param key: The key label
+        :type key: str
+        :param val:
+        :type val: str
+        :return: The cluster proto updated with new label
+        """
+        val = val.replace(".", "-").replace("+", "-")
+        instance.labels.update({key: val})
+        return instance
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def apply_parameters(
+        self,
+        node_ids: Sequence[str],
+        apply_all: bool,
+        project_id: str,
+        location: str,
+        instance_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Will update current set of Parameters to the set of specified nodes of 
the Memcached Instance.
+
+        :param node_ids: Nodes to which we should apply the instance-level 
parameter group.
+        :type node_ids: Sequence[str]
+        :param apply_all: Whether to apply instance-level parameter group to 
all nodes. If set to true,
+            will explicitly restrict users from specifying any nodes, and 
apply parameter group updates
+            to all nodes within the instance.
+        :type apply_all: bool
+        :param location: The location of the Cloud Memorystore instance (for 
example europe-west1)
+        :type location: str
+        :param instance_id: The logical name of the Memcached instance in the 
customer project.
+        :type instance_id: str
+        :param project_id: Project ID of the project that contains the 
instance. If set
+            to None or missing, the default project_id from the Google Cloud 
connection is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is 
specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual 
attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+
+        name = CloudMemcacheClient.instance_path(project_id, location, 
instance_id)
+
+        self.log.info("Applying update to instance: %s", instance_id)
+        result = client.apply_parameters(
+            name=name, node_ids=node_ids, apply_all=apply_all, retry=retry, 
timeout=timeout, metadata=metadata
+        )
+        result.result()
+        self.log.info("Instance updated: %s", instance_id)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_instance(
+        self,
+        location: str,
+        instance_id: str,
+        instance: Union[Dict, cloud_memcache.Instance],
+        project_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a Memcached instance based on the specified tier and memory 
size.
+
+        By default, the instance is accessible from the project's `default 
network
+        
<https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__.
+
+        :param location: The location of the Cloud Memorystore instance (for 
example europe-west1)
+        :type location: str
+        :param instance_id: Required. The logical name of the Memcached 
instance in the customer project
+            with the following restrictions:
+
+            -  Must contain only lowercase letters, numbers, and hyphens.
+            -  Must start with a letter.
+            -  Must be between 1-40 characters.
+            -  Must end with a number or a letter.
+            -  Must be unique within the customer project / location
+        :type instance_id: str
+        :param instance: Required. A Memcached [Instance] resource
+
+            If a dict is provided, it must be of the same form as the protobuf 
message
+            
:class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`
+        :type instance: Union[Dict, 
google.cloud.memcache_v1beta2.types.cloud_memcache.Instance]
+        :param project_id: Project ID of the project that contains the 
instance. If set
+            to None or missing, the default project_id from the GCP connection 
is used.
+        :type project_id: str
+        :param retry: A retry object used to retry requests. If ``None`` is 
specified, requests will not be
+            retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete. Note that if
+            ``retry`` is specified, the timeout applies to each individual 
attempt.
+        :type timeout: float
+        :param metadata: Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_conn()
+        parent = path_template.expand(
+            "projects/{project}/locations/{location}", project=project_id, 
location=location
+        )
+        instance_name = CloudMemcacheClient.instance_path(project_id, 
location, instance_id)
+        try:
+            instance = client.get_instance(
+                name=instance_name, retry=retry, timeout=timeout, 
metadata=metadata
+            )
+            self.log.info("Instance exists. Skipping creation.")
+            return instance
+        except NotFound:
+            self.log.info("Instance not exists.")
+
+        if isinstance(instance, dict):
+            instance = ParseDict(instance, cloud_memcache.Instance())

Review comment:
       I created issue for it: 
https://github.com/googleapis/python-memcache/issues/19
   `ParseDict` and `MessageToDict` have problems with parsing 
`google.cloud.memcache_v1beta2.types.cloud_memcache.Instance`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to