turbaszek commented on a change in pull request #7743: [AIRFLOW-7075] Operators 
for storing information from GCS into GA
URL: https://github.com/apache/airflow/pull/7743#discussion_r396982305
 
 

 ##########
 File path: airflow/providers/google/marketing_platform/operators/analytics.py
 ##########
 @@ -169,13 +173,273 @@ def __init__(
         self.account_id = account_id
         self.web_property_id = web_property_id
         self.api_version = api_version
-        self.gcp_connection_id = gcp_connection_id
+        self.gcp_conn_id = gcp_conn_id
 
     def execute(self, context):
         hook = GoogleAnalyticsHook(
-            api_version=self.api_version, 
gcp_connection_id=self.gcp_connection_id
+            api_version=self.api_version, gcp_conn_id=self.gcp_conn_id
         )
         result = hook.list_ad_words_links(
             account_id=self.account_id, web_property_id=self.web_property_id,
         )
         return result
+
+
+class GoogleAnalyticsDataImportUploadOperator(BaseOperator):
+    """
+    Take a file from Cloud Storage and uploads it to GA via data import API.
+
+    :param storage_bucket: The Google cloud storage bucket where the file is 
stored.
+    :type storage_bucket: str
+    :param storage_name_object: The name of the object in the desired Google 
cloud
+          storage bucket. (templated) If the destination points to an existing
+          folder, the file will be taken from the specified folder.
+    :type storage_name_object: str
+    :param account_id: The GA account Id (long) to which the data upload 
belongs
+    :type account_id: str
+    :param web_property_id: The web property UA-string associated with the 
upload
+    :type web_property_id: str
+    :param custom_data_source_id: The id to which the data import belongs
+    :type custom_data_source_id: str
+    :param resumable_upload: flag to upload the file in a resumable fashion, 
using a
+        series of at least two requests
+    :type resumable_upload: bool
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+    :type delegate_to: str
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :type api_version: str
+    """
+
+    template_fields = ("storage_bucket", "storage_name_object")
+
+    @apply_defaults
+    def __init__(
+        self,
+        storage_bucket: str,
+        storage_name_object: str,
+        account_id: str,
+        web_property_id: str,
+        custom_data_source_id: str,
+        resumable_upload: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        api_version: str = "v3",
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+        self.storage_bucket = storage_bucket
+        self.storage_name_object = storage_name_object
+        self.account_id = account_id
+        self.web_property_id = web_property_id
+        self.custom_data_source_id = custom_data_source_id
+        self.resumable_upload = resumable_upload
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.api_version = api_version
+
+    def execute(self, context):
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
+        )
+
+        ga_hook = GoogleAnalyticsHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+        )
+
+        with NamedTemporaryFile("w+") as tmp_file:
+            self.log.info(
+                "Downloading file from GCS: %s/%s ",
+                self.storage_bucket,
+                self.storage_name_object,
+            )
+            gcs_hook.download(
+                bucket_name=self.storage_bucket,
+                object_name=self.storage_name_object,
+                filename=tmp_file.name,
+            )
+
+            ga_hook.upload_data(
+                tmp_file.name,
+                self.account_id,
+                self.web_property_id,
+                self.custom_data_source_id,
+                self.resumable_upload,
+            )
+
+
+class GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator):
+    """
+    Deletes previous GA uploads to leave the latest file to control the size 
of the Data Set Quota.
+
+    :param account_id: The GA account Id (long) to which the data upload 
belongs
+    :type account_id: str
+    :param web_property_id: The web property UA-string associated with the 
upload
+    :type web_property_id: str
+    :param custom_data_source_id: The id to which the data import belongs
+    :type custom_data_source_id: str
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+    :type delegate_to: str
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :type api_version: str
+    """
+
+    def __init__(
+        self,
+        account_id: str,
+        web_property_id: str,
+        custom_data_source_id: str,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        api_version: str = "v3",
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+
+        self.account_id = account_id
+        self.web_property_id = web_property_id
+        self.custom_data_source_id = custom_data_source_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.api_version = api_version
+
+    def execute(self, context):
+        ga_hook = GoogleAnalyticsHook(
+            gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, 
api_version=self.api_version
+        )
+
+        uploads = ga_hook.list_uploads(
+            account_id=self.account_id,
+            web_property_id=self.web_property_id,
+            custom_data_source_id=self.custom_data_source_id,
+        )
+
+        cids = [upload["id"] for upload in uploads]
 
 Review comment:
   As per next line:
   ```
   delete_request_body = {"customDataImportUids": cids
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to