sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580062200
##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
)
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+ """
+ Determines a list of objects that were added or modified at a GCS source
+ location during a specific time-span, copies them to a temporary location
+ on the local file system, runs a transform on this file as specified by
+ the transformation script and uploads the output to the destination bucket.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+ The locations of the source and the destination files in the local
+ filesystem is provided as an first and second arguments to the
+ transformation script. The time-span is passed to the transform script as
+ third and fourth argument as UTC ISO 8601 string.
+
+ The transformation script is expected to read the
+ data from source, transform it and write the output to the local
+ destination file.
+
+ :param source_bucket: The bucket to fetch data from. (templated)
+ :type source_bucket: str
+ :param source_prefix: Prefix string which filters objects whose name begin
with
+ this prefix. Can interpolate execution date and time components.
(templated)
+ :type source_prefix: str
+ :param source_gcp_conn_id: The connection ID to use connecting to Google
Cloud
+ to download files to be processed.
+ :type source_gcp_conn_id: str
+ :param source_impersonation_chain: Optional service account to impersonate
using short-term
+ credentials (to download files to be processed), or chained list of
accounts required to
+ get the access_token of the last account in the list, which will be
impersonated in the
+ request. If set as a string, the account must grant the originating
account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type source_impersonation_chain: Union[str, Sequence[str]]
+
+ :param destination_bucket: The bucket to write data to. (templated)
+ :type destination_bucket: str
+ :param destination_prefix: Prefix string for the upload location.
+ Can interpolate execution date and time components. (templated)
+ :type destination_prefix: str
+ :param destination_gcp_conn_id: The connection ID to use connecting to
Google Cloud
+ to upload processed files.
+ :type destination_gcp_conn_id: str
+ :param destination_impersonation_chain: Optional service account to
impersonate using short-term
+ credentials (to upload processed files), or chained list of accounts
required to get the access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+ :param transform_script: location of the executable transformation script
or list of arguments
+ passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+ :type transform_script: Union[str, List[str]]
+
+ """
+
+ template_fields = (
+ 'source_bucket',
+ 'source_prefix',
+ 'destination_bucket',
+ 'destination_prefix',
+ 'transform_script',
+ 'source_impersonation_chain',
+ 'destination_impersonation_chain',
+ )
+
+ @staticmethod
+ def interpolate_prefix(prefix, dt):
+ """Interpolate prefix with datetime.
+
+ :param prefix: The prefix to interpolate
+ :type prefix: str
+ :param dt: The datetime to interpolate
+ :type dt: datetime
+
+ """
+ return None if prefix is None else dt.strftime(prefix)
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ source_bucket: str,
+ source_prefix: str,
+ source_gcp_conn_id: str,
+ destination_bucket: str,
+ destination_prefix: str,
+ destination_gcp_conn_id: str,
+ transform_script: Union[str, List[str]],
+ source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ destination_impersonation_chain: Optional[Union[str, Sequence[str]]] =
None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.source_bucket = source_bucket
+ self.source_prefix = source_prefix
+ self.source_gcp_conn_id = source_gcp_conn_id
+ self.source_impersonation_chain = source_impersonation_chain
+
+ self.destination_bucket = destination_bucket
+ self.destination_prefix = destination_prefix
+ self.destination_gcp_conn_id = destination_gcp_conn_id
+ self.destination_impersonation_chain = destination_impersonation_chain
+
+ self.transform_script = transform_script
+ self.output_encoding = sys.getdefaultencoding()
+
+ def execute(self, context: dict) -> None:
+ # Define intervals and prefixes.
+ timespan_start = context["execution_date"]
+ timespan_end = context["dag"].following_schedule(timespan_start)
+ if timespan_end is None:
+ self.log.warning("No following schedule found, setting timespan
end to max %s", timespan_end)
+ timespan_end = datetime.datetime.max
+
+ timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+ timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+ source_prefix_interp =
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+ self.source_prefix,
+ timespan_start,
+ )
+ destination_prefix_interp =
GCSTimeSpanFileTransformOperator.interpolate_prefix(
+ self.destination_prefix,
+ timespan_start,
+ )
+
+ source_hook = GCSHook(
+ gcp_conn_id=self.source_gcp_conn_id,
+ impersonation_chain=self.source_impersonation_chain,
+ )
+ destination_hook = GCSHook(
+ gcp_conn_id=self.destination_gcp_conn_id,
+ impersonation_chain=self.destination_impersonation_chain,
+ )
+
+ # Fetch list of files.
+ blobs_to_transform = source_hook.list_by_timespan(
+ bucket_name=self.source_bucket,
+ prefix=source_prefix_interp,
+ timespan_start=timespan_start,
+ timespan_end=timespan_end,
+ )
+
+ with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as
temp_output_dir:
+ temp_input_dir = Path(temp_input_dir)
+ temp_output_dir = Path(temp_output_dir)
+
+ # TODO: download in parallel.
+ for blob_to_transform in blobs_to_transform:
+ destination_file = temp_input_dir / blob_to_transform
+ destination_file.parent.mkdir(parents=True, exist_ok=True)
+ source_hook.download(
+ bucket_name=self.source_bucket,
+ object_name=blob_to_transform,
+ filename=str(destination_file),
+ )
Review comment:
Thanks for looking at this in more detail.
> This is not best way to assure idempotency I think
I don't see a difference between multiple files vs a single file with
regards to idempotency. If a single download fails the entire task will fail,
nothing will be produced. I think what I will follow up with is a retry
mechanism for the GCS download and upload - this is something that increased
the reliability of tasks that interact with GCS significantly for us in
production.
> Also downloading multiple files to single temp dir my create some memory
issues.
Indeed, this can be an issue. The loop could be changed to `download single
file -> process single file` but then we can not take advantage of parallel
download - which is something that I want to follow up with as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]