vincbeck commented on code in PR #30501:
URL: https://github.com/apache/airflow/pull/30501#discussion_r1160064996
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +123,51 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
def execute(self, context: Context) -> None:
hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
Review Comment:
Instead of creating it here and passing to the private methods I would use
`@cached_property` to create it. Here is an example:
https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/operators/emr.py#L402
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -87,6 +89,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By
default we dump the json
+ :param ExportTime: Time in the past from which to export table data,
counted in seconds from the start of the Unix epoch. The table export will be a
snapshot of the table’s state at this point in time.
Review Comment:
```suggestion
:param export_time: Timestamp in the past from which to export table
data (in seconds). The table export will be a snapshot of the table’s state at
this point in time.
```
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +123,51 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
def execute(self, context: Context) -> None:
hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time:
+ self._export_table_to_point_in_time(hook=hook)
+ else:
+ self._export_entire_data(hook=hook)
+
+ def _export_table_to_point_in_time(self, hook: DynamoDBHook):
+ """
+ Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table’s
+ state at this point in time.
+ """
+ terminal_status = ['COMPLETED', 'FAILED']
+ sleep_time = 30 # unit: seconds
+ client = hook.conn.meta.client
+ while True:
+ response = client.export_table_to_point_in_time(
+ TableArn=self.dynamodb_table_name,
+ ExportTime=self.export_time,
+ ClientToken='string',
Review Comment:
In my opinion, you dont need to provide it
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +123,51 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.export_time = export_time
+ self.export_format = export_format
def execute(self, context: Context) -> None:
hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
- table = hook.get_conn().Table(self.dynamodb_table_name)
+ if self.export_time:
+ self._export_table_to_point_in_time(hook=hook)
+ else:
+ self._export_entire_data(hook=hook)
+
+ def _export_table_to_point_in_time(self, hook: DynamoDBHook):
Review Comment:
I would change the logic of this function. I would wait for the export to be
done only if specified by the user with a new flag `wait_for_completion`. If
`False`, then just trigger the export without waiting. If `True`, wait for the
export but use a waiter in order to do so. Do not use a `while True`. See
[here](https://github.com/apache/airflow/tree/main/airflow/providers/amazon/aws/waiters)
how to create a custom waiter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]