turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580053097



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin 
with
+           this prefix. Can interpolate execution date and time components. 
(templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google 
Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate 
using short-term
+        credentials (to download files to be processed), or chained list of 
accounts required to
+        get the access_token of the last account in the list, which will be 
impersonated in the
+        request. If set as a string, the account must grant the originating 
account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to 
Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to 
impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts 
required to get the access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script 
or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_prefix: str,
+        source_gcp_conn_id: str,
+        destination_bucket: str,
+        destination_prefix: str,
+        destination_gcp_conn_id: str,
+        transform_script: Union[str, List[str]],
+        source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        destination_impersonation_chain: Optional[Union[str, Sequence[str]]] = 
None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_prefix = source_prefix
+        self.source_gcp_conn_id = source_gcp_conn_id
+        self.source_impersonation_chain = source_impersonation_chain
+
+        self.destination_bucket = destination_bucket
+        self.destination_prefix = destination_prefix
+        self.destination_gcp_conn_id = destination_gcp_conn_id
+        self.destination_impersonation_chain = destination_impersonation_chain
+
+        self.transform_script = transform_script
+        self.output_encoding = sys.getdefaultencoding()
+
+    def execute(self, context: dict) -> None:
+        # Define intervals and prefixes.
+        timespan_start = context["execution_date"]
+        timespan_end = context["dag"].following_schedule(timespan_start)
+        if timespan_end is None:
+            self.log.warning("No following schedule found, setting timespan 
end to max %s", timespan_end)
+            timespan_end = datetime.datetime.max
+
+        timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+        timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+        source_prefix_interp = 
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.source_prefix,
+            timespan_start,
+        )
+        destination_prefix_interp = 
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.destination_prefix,
+            timespan_start,
+        )
+
+        source_hook = GCSHook(
+            gcp_conn_id=self.source_gcp_conn_id,
+            impersonation_chain=self.source_impersonation_chain,
+        )
+        destination_hook = GCSHook(
+            gcp_conn_id=self.destination_gcp_conn_id,
+            impersonation_chain=self.destination_impersonation_chain,
+        )
+
+        # Fetch list of files.
+        blobs_to_transform = source_hook.list_by_timespan(
+            bucket_name=self.source_bucket,
+            prefix=source_prefix_interp,
+            timespan_start=timespan_start,
+            timespan_end=timespan_end,
+        )
+
+        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as 
temp_output_dir:
+            temp_input_dir = Path(temp_input_dir)
+            temp_output_dir = Path(temp_output_dir)
+
+            # TODO: download in parallel.
+            for blob_to_transform in blobs_to_transform:
+                destination_file = temp_input_dir / blob_to_transform
+                destination_file.parent.mkdir(parents=True, exist_ok=True)
+                source_hook.download(
+                    bucket_name=self.source_bucket,
+                    object_name=blob_to_transform,
+                    filename=str(destination_file),
+                )

Review comment:
       I'm not a fan of such operators. If single download or any other 
operation fails in the `for` loop whole operation will fail. This is not best 
way to assure idempotency I think. Also downloading multiple files to single 
temp dir my create some memory issues.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to