ferruzzi commented on code in PR #30501:
URL: https://github.com/apache/airflow/pull/30501#discussion_r1178335915
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -87,6 +93,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By
default we dump the json
+ :param ExportTime: Time in the past from which to export table data,
counted in seconds from the start of
Review Comment:
```suggestion
:param export_time: Time in the past from which to export table data,
counted in seconds from the start of
```
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
- def execute(self, context: Context) -> None:
- hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time and self.export_time > datetime.now():
+ raise ValueError("The export_time parameter cannot be a future
time.")
+ @cached_property
+ def hook(self):
+ """Create DynamoDBHook"""
+ return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+ def execute(self, context: Context) -> None:
+ if self.export_time:
+ self._export_table_to_point_in_time()
+ else:
+ self._export_entire_data()
+
+ def _export_table_to_point_in_time(self):
+ """
+ Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
+ state at this point in time.
+ """
+ client = self.hook.conn.meta.client
Review Comment:
Not for now, but boy is this ugly, we should make a better answer for
fetching the client at some point.
##########
airflow/providers/amazon/aws/waiters/README.md:
##########
@@ -98,3 +98,5 @@
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
Note that since the get_waiter is in the hook instead of on the client side, a
custom waiter is
just `hook.get_waiter` and not `hook.conn.get_waiter`. Other than that, they
should be identical.
+
+Note the custom waiter doesn't work with resource_type, only client_type is
supported.
Review Comment:
Thanks for adding this. As we discussed on Slack, we could tackle this a
few ways, but none of them should be considered blocking for this PR. As a
temporary Airflow-based fix, I like the idea of adding come logic to Base AWS
Hook's "get waiter" method that checks if the cal is being made against a
Resource and creates a client connection to handle it if that is the case.
Longer term, it would be nice to seer why botocore doesn't support this
natively and maybe submit a PR to add it... but the hook solution is a
something we could do quickly in code we own.
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
- def execute(self, context: Context) -> None:
- hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time and self.export_time > datetime.now():
+ raise ValueError("The export_time parameter cannot be a future
time.")
+ @cached_property
+ def hook(self):
+ """Create DynamoDBHook"""
+ return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+ def execute(self, context: Context) -> None:
+ if self.export_time:
+ self._export_table_to_point_in_time()
+ else:
+ self._export_entire_data()
+
+ def _export_table_to_point_in_time(self):
+ """
+ Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
+ state at this point in time.
+ """
+ client = self.hook.conn.meta.client
+ response = client.export_table_to_point_in_time(
+ TableArn=self.dynamodb_table_name,
+ ExportTime=self.export_time,
+ S3Bucket=self.s3_bucket_name,
+ S3Prefix=self.s3_key_prefix,
+ ExportFormat=self.export_format,
+ )
+ credentials = self.hook.get_credentials()
+ waiter = self.hook.get_waiter(
+ CUSTOM_WAITER_NAME,
+ client=boto3.client(
+ "dynamodb",
+ region_name=client.meta.region_name,
+ aws_access_key_id=credentials.access_key,
+ aws_secret_access_key=credentials.secret_key,
+ ),
+ )
+ export_arn = response.get("ExportDescription", {}).get("ExportArn")
+ waiter.wait(
+ ExportArn=export_arn,
+ )
Review Comment:
(Non-blocking) The pre-commit/static checks broke this into three lines
because of the trailing comma. If you remove that, this will get collapsed
into a one-liner.
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
- def execute(self, context: Context) -> None:
- hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time and self.export_time > datetime.now():
+ raise ValueError("The export_time parameter cannot be a future
time.")
+ @cached_property
+ def hook(self):
+ """Create DynamoDBHook"""
+ return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+ def execute(self, context: Context) -> None:
+ if self.export_time:
+ self._export_table_to_point_in_time()
+ else:
+ self._export_entire_data()
+
+ def _export_table_to_point_in_time(self):
+ """
+ Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
+ state at this point in time.
+ """
+ client = self.hook.conn.meta.client
+ response = client.export_table_to_point_in_time(
+ TableArn=self.dynamodb_table_name,
+ ExportTime=self.export_time,
+ S3Bucket=self.s3_bucket_name,
+ S3Prefix=self.s3_key_prefix,
+ ExportFormat=self.export_format,
+ )
+ credentials = self.hook.get_credentials()
+ waiter = self.hook.get_waiter(
+ CUSTOM_WAITER_NAME,
Review Comment:
(Non-blocking) I appreciate the constant here instead of a magic string, but
I think in this case I'd likely inline it. I feel like we usually use waiter
names directly and don't have a problem with it here. But I'll leave it up to
you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]