sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580062200



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin 
with
+           this prefix. Can interpolate execution date and time components. 
(templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google 
Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate 
using short-term
+        credentials (to download files to be processed), or chained list of 
accounts required to
+        get the access_token of the last account in the list, which will be 
impersonated in the
+        request. If set as a string, the account must grant the originating 
account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to 
Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to 
impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts 
required to get the access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script 
or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_prefix: str,
+        source_gcp_conn_id: str,
+        destination_bucket: str,
+        destination_prefix: str,
+        destination_gcp_conn_id: str,
+        transform_script: Union[str, List[str]],
+        source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        destination_impersonation_chain: Optional[Union[str, Sequence[str]]] = 
None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_prefix = source_prefix
+        self.source_gcp_conn_id = source_gcp_conn_id
+        self.source_impersonation_chain = source_impersonation_chain
+
+        self.destination_bucket = destination_bucket
+        self.destination_prefix = destination_prefix
+        self.destination_gcp_conn_id = destination_gcp_conn_id
+        self.destination_impersonation_chain = destination_impersonation_chain
+
+        self.transform_script = transform_script
+        self.output_encoding = sys.getdefaultencoding()
+
+    def execute(self, context: dict) -> None:
+        # Define intervals and prefixes.
+        timespan_start = context["execution_date"]
+        timespan_end = context["dag"].following_schedule(timespan_start)
+        if timespan_end is None:
+            self.log.warning("No following schedule found, setting timespan 
end to max %s", timespan_end)
+            timespan_end = datetime.datetime.max
+
+        timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+        timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+        source_prefix_interp = 
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.source_prefix,
+            timespan_start,
+        )
+        destination_prefix_interp = 
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.destination_prefix,
+            timespan_start,
+        )
+
+        source_hook = GCSHook(
+            gcp_conn_id=self.source_gcp_conn_id,
+            impersonation_chain=self.source_impersonation_chain,
+        )
+        destination_hook = GCSHook(
+            gcp_conn_id=self.destination_gcp_conn_id,
+            impersonation_chain=self.destination_impersonation_chain,
+        )
+
+        # Fetch list of files.
+        blobs_to_transform = source_hook.list_by_timespan(
+            bucket_name=self.source_bucket,
+            prefix=source_prefix_interp,
+            timespan_start=timespan_start,
+            timespan_end=timespan_end,
+        )
+
+        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as 
temp_output_dir:
+            temp_input_dir = Path(temp_input_dir)
+            temp_output_dir = Path(temp_output_dir)
+
+            # TODO: download in parallel.
+            for blob_to_transform in blobs_to_transform:
+                destination_file = temp_input_dir / blob_to_transform
+                destination_file.parent.mkdir(parents=True, exist_ok=True)
+                source_hook.download(
+                    bucket_name=self.source_bucket,
+                    object_name=blob_to_transform,
+                    filename=str(destination_file),
+                )

Review comment:
       Thanks for looking at this in more detail. 
   
   > This is not best way to assure idempotency I think
   
   I don't see a difference between multiple files vs a single file with 
regards to idempotency. If a single download fails the entire task will fail, 
nothing will be produced. I think what I will follow up with is a retry 
mechanism for the GCS download and upload - this is something that increased 
the reliability of tasks that interact with GCS significantly for us in 
production.
   
   > Also downloading multiple files to single temp dir my create some memory 
issues.
   
   Indeed, this can be an issue. The loop could be changed to `download single 
file -> process single file` but then we can not take advantage of parallel 
download - which is something that I want to follow up with as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to