pabloem commented on code in PR #22721:
URL: https://github.com/apache/beam/pull/22721#discussion_r958762147


##########
playground/infrastructure/datastore_client.py:
##########
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Module contains the client to communicate with Google Cloud Datastore
+"""
+import logging
+import os.path
+from datetime import datetime
+from typing import List
+
+from google.cloud import datastore
+from tqdm import tqdm
+
+import config
+from config import Config, PrecompiledExample, DatastoreProps
+from helper import Example
+
+from api.v1.api_pb2 import Sdk, PrecompiledObjectType
+
+
+class DatastoreException(Exception):
+    def __init__(self, error: str):
+        super().__init__()
+        self.msg = error
+
+    def __str__(self):
+        return self.msg
+
+
+# Google Datastore documentation link: 
https://cloud.google.com/datastore/docs/concepts
+class DatastoreClient:
+    """DatastoreClient is a datastore client for sending a request to the 
Google."""
+
+    def __init__(self):
+        self._check_envs()
+        self._datastore_client = 
datastore.Client(namespace=DatastoreProps.NAMESPACE, 
project=Config.GOOGLE_CLOUD_PROJECT)
+
+    def _check_envs(self):
+        if Config.GOOGLE_CLOUD_PROJECT is None:
+            raise KeyError("GOOGLE_CLOUD_PROJECT environment variable should 
be specified in os")
+
+    def save_to_cloud_datastore(self, examples_from_rep: List[Example], sdk: 
Sdk):
+        """
+        Save examples, output and meta to datastore
+        Args:
+            :param sdk: sdk from parameters
+            :param examples_from_rep: examples from the repository for saving 
to the Cloud Datastore
+        """
+        # initialise data
+        snippets = []
+        examples = []
+        pc_objects = []
+        files = []
+        updated_example_ids = []
+        now = datetime.today()
+
+        # retrieve the last schema version
+        actual_schema_version_key = self._get_actual_schema_version_key()
+
+        # retrieve all example keys before updating
+        examples_ids_before_updating = self._get_all_examples(sdk)
+
+        # loop through every example to save them to the Cloud Datastore
+        with self._datastore_client.transaction():
+            for example in tqdm(examples_from_rep):
+                sdk_key = self._get_key(DatastoreProps.SDK_KIND, 
Sdk.Name(example.sdk))
+                example_id = 
f"{Sdk.Name(example.sdk)}{config.DatastoreProps.KEY_NAME_DELIMITER}{example.name}"
+                updated_example_ids.append(example_id)
+                self._to_example_entities(example, example_id, sdk_key, 
actual_schema_version_key, examples)
+                self._to_snippet_entities(example, example_id, sdk_key, now, 
actual_schema_version_key, snippets)
+                self._to_pc_object_entities(example, example_id, pc_objects)
+                self._to_file_entities(example, example_id, files)
+
+            self._datastore_client.put_multi(examples)
+            self._datastore_client.put_multi(snippets)
+            self._datastore_client.put_multi(pc_objects)
+            self._datastore_client.put_multi(files)
+
+            # delete examples from the Cloud Datastore that are not in the 
repository
+            examples_ids_for_removing = list(filter(lambda key: key not in 
updated_example_ids, examples_ids_before_updating))
+            if len(examples_ids_for_removing) != 0:
+                logging.info("Start of deleting extra playground examples ...")
+                examples_keys_for_removing = list(map(lambda ex_id: 
self._get_key(DatastoreProps.EXAMPLE_KIND, ex_id), examples_ids_for_removing))
+                snippets_keys_for_removing = list(map(lambda ex_id: 
self._get_key(DatastoreProps.SNIPPET_KIND, ex_id), examples_ids_for_removing))
+                file_keys_for_removing = list(map(lambda ex_id: 
self._get_key(DatastoreProps.FILED_KIND, 
f"{ex_id}{config.DatastoreProps.KEY_NAME_DELIMITER}{0}"), 
examples_ids_for_removing))
+                pc_objs_keys_for_removing = []
+                for example_id_item in examples_ids_for_removing:
+                    for example_type in 
[PrecompiledExample.GRAPH_EXTENSION.upper(), 
PrecompiledExample.OUTPUT_EXTENSION.upper(), 
PrecompiledExample.LOG_EXTENSION.upper()]:
+                        
pc_objs_keys_for_removing.append(self._get_key(DatastoreProps.PRECOMPILED_OBJECT_KIND,
 f"{example_id_item}{config.DatastoreProps.KEY_NAME_DELIMITER}{example_type}"))

Review Comment:
   maybe we should add auto-formatting for this code base? so that it's 
consistent with the rest of the Beam code base in Python (can be done as 
follow-up)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to