ferruzzi commented on code in PR #30501:
URL: https://github.com/apache/airflow/pull/30501#discussion_r1178335915


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -87,6 +93,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
     :param dynamodb_scan_kwargs: kwargs pass to 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By 
default we dump the json
+    :param ExportTime: Time in the past from which to export table data, 
counted in seconds from the start of

Review Comment:
   ```suggestion
       :param export_time: Time in the past from which to export table data, 
counted in seconds from the start of
   ```



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
+        self.export_time = export_time
+        self.export_format = export_format
 
-    def execute(self, context: Context) -> None:
-        hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
-        table = hook.get_conn().Table(self.dynamodb_table_name)
+        if self.export_time and self.export_time > datetime.now():
+            raise ValueError("The export_time parameter cannot be a future 
time.")
 
+    @cached_property
+    def hook(self):
+        """Create DynamoDBHook"""
+        return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+    def execute(self, context: Context) -> None:
+        if self.export_time:
+            self._export_table_to_point_in_time()
+        else:
+            self._export_entire_data()
+
+    def _export_table_to_point_in_time(self):
+        """
+        Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
+         state at this point in time.
+        """
+        client = self.hook.conn.meta.client

Review Comment:
   Not for now, but boy is this ugly, we should make a better answer for 
fetching the client at some point.



##########
airflow/providers/amazon/aws/waiters/README.md:
##########
@@ -98,3 +98,5 @@ 
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
 
 Note that since the get_waiter is in the hook instead of on the client side, a 
custom waiter is
 just `hook.get_waiter` and not `hook.conn.get_waiter`.  Other than that, they 
should be identical.
+
+Note the custom waiter doesn't work with resource_type, only client_type is 
supported.

Review Comment:
   Thanks for adding this.  As we discussed on Slack, we could tackle this a 
few ways, but none of them should be considered blocking for this PR.  As a 
temporary Airflow-based fix, I like the idea of adding come logic to Base AWS 
Hook's "get waiter" method that checks if the cal is being made against a 
Resource and creates a client connection to handle it if that is the case.  
Longer term, it would be nice to seer why botocore doesn't support this 
natively and maybe submit a PR to add it... but the hook solution is a 
something we could do quickly in code we own.



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
+        self.export_time = export_time
+        self.export_format = export_format
 
-    def execute(self, context: Context) -> None:
-        hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
-        table = hook.get_conn().Table(self.dynamodb_table_name)
+        if self.export_time and self.export_time > datetime.now():
+            raise ValueError("The export_time parameter cannot be a future 
time.")
 
+    @cached_property
+    def hook(self):
+        """Create DynamoDBHook"""
+        return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+    def execute(self, context: Context) -> None:
+        if self.export_time:
+            self._export_table_to_point_in_time()
+        else:
+            self._export_entire_data()
+
+    def _export_table_to_point_in_time(self):
+        """
+        Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
+         state at this point in time.
+        """
+        client = self.hook.conn.meta.client
+        response = client.export_table_to_point_in_time(
+            TableArn=self.dynamodb_table_name,
+            ExportTime=self.export_time,
+            S3Bucket=self.s3_bucket_name,
+            S3Prefix=self.s3_key_prefix,
+            ExportFormat=self.export_format,
+        )
+        credentials = self.hook.get_credentials()
+        waiter = self.hook.get_waiter(
+            CUSTOM_WAITER_NAME,
+            client=boto3.client(
+                "dynamodb",
+                region_name=client.meta.region_name,
+                aws_access_key_id=credentials.access_key,
+                aws_secret_access_key=credentials.secret_key,
+            ),
+        )
+        export_arn = response.get("ExportDescription", {}).get("ExportArn")
+        waiter.wait(
+            ExportArn=export_arn,
+        )

Review Comment:
   (Non-blocking)  The pre-commit/static checks broke this into three lines 
because of the trailing comma.  If you remove that, this will get collapsed 
into a one-liner.



##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -118,11 +130,54 @@ def __init__(
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
+        self.export_time = export_time
+        self.export_format = export_format
 
-    def execute(self, context: Context) -> None:
-        hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
-        table = hook.get_conn().Table(self.dynamodb_table_name)
+        if self.export_time and self.export_time > datetime.now():
+            raise ValueError("The export_time parameter cannot be a future 
time.")
 
+    @cached_property
+    def hook(self):
+        """Create DynamoDBHook"""
+        return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+
+    def execute(self, context: Context) -> None:
+        if self.export_time:
+            self._export_table_to_point_in_time()
+        else:
+            self._export_entire_data()
+
+    def _export_table_to_point_in_time(self):
+        """
+        Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
+         state at this point in time.
+        """
+        client = self.hook.conn.meta.client
+        response = client.export_table_to_point_in_time(
+            TableArn=self.dynamodb_table_name,
+            ExportTime=self.export_time,
+            S3Bucket=self.s3_bucket_name,
+            S3Prefix=self.s3_key_prefix,
+            ExportFormat=self.export_format,
+        )
+        credentials = self.hook.get_credentials()
+        waiter = self.hook.get_waiter(
+            CUSTOM_WAITER_NAME,

Review Comment:
   (Non-blocking) I appreciate the constant here instead of a magic string, but 
I think in this case I'd likely inline it.  I feel like we usually use waiter 
names directly and don't have a problem with it here.  But I'll leave it up to 
you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to