turbaszek commented on a change in pull request #7743: [AIRFLOW-7075] Operators for storing information from GCS into GA URL: https://github.com/apache/airflow/pull/7743#discussion_r396982305
########## File path: airflow/providers/google/marketing_platform/operators/analytics.py ########## @@ -169,13 +173,273 @@ def __init__( self.account_id = account_id self.web_property_id = web_property_id self.api_version = api_version - self.gcp_connection_id = gcp_connection_id + self.gcp_conn_id = gcp_conn_id def execute(self, context): hook = GoogleAnalyticsHook( - api_version=self.api_version, gcp_connection_id=self.gcp_connection_id + api_version=self.api_version, gcp_conn_id=self.gcp_conn_id ) result = hook.list_ad_words_links( account_id=self.account_id, web_property_id=self.web_property_id, ) return result + + +class GoogleAnalyticsDataImportUploadOperator(BaseOperator): + """ + Take a file from Cloud Storage and uploads it to GA via data import API. + + :param storage_bucket: The Google cloud storage bucket where the file is stored. + :type storage_bucket: str + :param storage_name_object: The name of the object in the desired Google cloud + storage bucket. (templated) If the destination points to an existing + folder, the file will be taken from the specified folder. + :type storage_name_object: str + :param account_id: The GA account Id (long) to which the data upload belongs + :type account_id: str + :param web_property_id: The web property UA-string associated with the upload + :type web_property_id: str + :param custom_data_source_id: The id to which the data import belongs + :type custom_data_source_id: str + :param resumable_upload: flag to upload the file in a resumable fashion, using a + series of at least two requests + :type resumable_upload: bool + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. + :type delegate_to: str + :param api_version: The version of the api that will be requested for example 'v3'. + :type api_version: str + """ + + template_fields = ("storage_bucket", "storage_name_object") + + @apply_defaults + def __init__( + self, + storage_bucket: str, + storage_name_object: str, + account_id: str, + web_property_id: str, + custom_data_source_id: str, + resumable_upload: bool = False, + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + api_version: str = "v3", + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self.storage_bucket = storage_bucket + self.storage_name_object = storage_name_object + self.account_id = account_id + self.web_property_id = web_property_id + self.custom_data_source_id = custom_data_source_id + self.resumable_upload = resumable_upload + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.api_version = api_version + + def execute(self, context): + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to + ) + + ga_hook = GoogleAnalyticsHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + + with NamedTemporaryFile("w+") as tmp_file: + self.log.info( + "Downloading file from GCS: %s/%s ", + self.storage_bucket, + self.storage_name_object, + ) + gcs_hook.download( + bucket_name=self.storage_bucket, + object_name=self.storage_name_object, + filename=tmp_file.name, + ) + + ga_hook.upload_data( + tmp_file.name, + self.account_id, + self.web_property_id, + self.custom_data_source_id, + self.resumable_upload, + ) + + +class GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator): + """ + Deletes previous GA uploads to leave the latest file to control the size of the Data Set Quota. + + :param account_id: The GA account Id (long) to which the data upload belongs + :type account_id: str + :param web_property_id: The web property UA-string associated with the upload + :type web_property_id: str + :param custom_data_source_id: The id to which the data import belongs + :type custom_data_source_id: str + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. + :type delegate_to: str + :param api_version: The version of the api that will be requested for example 'v3'. + :type api_version: str + """ + + def __init__( + self, + account_id: str, + web_property_id: str, + custom_data_source_id: str, + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + api_version: str = "v3", + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + + self.account_id = account_id + self.web_property_id = web_property_id + self.custom_data_source_id = custom_data_source_id + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.api_version = api_version + + def execute(self, context): + ga_hook = GoogleAnalyticsHook( + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, api_version=self.api_version + ) + + uploads = ga_hook.list_uploads( + account_id=self.account_id, + web_property_id=self.web_property_id, + custom_data_source_id=self.custom_data_source_id, + ) + + cids = [upload["id"] for upload in uploads] Review comment: As per next line: ``` delete_request_body = {"customDataImportUids": cids ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services