Ghoul-SSZ commented on code in PR #41304:
URL: https://github.com/apache/airflow/pull/41304#discussion_r1715921353


##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -148,29 +162,53 @@ def hook(self):
         return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
 
     def execute(self, context: Context) -> None:
-        if self.export_time:
+        # There are 2 separate export to point in time configuration:
+        # 1. Full export, which takes the export_time arg.
+        # 2. Incremental export, which takes the incremental_export_... args
+        # Hence export time could not be used as the proper indicator for the 
`_export_table_to_point_in_time`
+        # function. This change introduces a new boolean, as the indicator for 
whether the operator scans
+        # and export entire data or using the point in time functionality.
+        if self.point_in_time_export or self.export_time:
             self._export_table_to_point_in_time()
         else:
             self._export_entire_data()
 
     def _export_table_to_point_in_time(self):
         """
-        Export data from start of epoc till `export_time`.
+        Export data to point in time.
 
+        Full export exports data from start of epoc till `export_time`.
         Table export will be a snapshot of the table's state at this point in 
time.
+
+        Incremental export exports the data from a specific datetime to a 
specific datetime
+
+
+        Note: S3BucketOwner is a required parameter when exporting to a S3 
bucket in another account.
         """
         if self.export_time and self.export_time > 
datetime.now(self.export_time.tzinfo):
             raise ValueError("The export_time parameter cannot be a future 
time.")
 
         client = self.hook.conn.meta.client
         table_description = 
client.describe_table(TableName=self.dynamodb_table_name)
-        response = client.export_table_to_point_in_time(
-            TableArn=table_description.get("Table", {}).get("TableArn"),
-            ExportTime=self.export_time,
-            S3Bucket=self.s3_bucket_name,
-            S3Prefix=self.s3_key_prefix,
-            ExportFormat=self.export_format,
-        )
+
+        export_table_to_point_in_time_base_args = {
+            "TableArn": table_description.get("Table", {}).get("TableArn"),
+            "ExportTime": self.export_time,
+            "S3Bucket": self.s3_bucket_name,
+            "S3Prefix": self.s3_key_prefix,
+            "S3BucketOwner": self.s3_bucket_owner,
+            "ExportFormat": self.export_format,
+        }
+        export_table_to_point_in_time_args = {
+            **export_table_to_point_in_time_base_args,
+            **self.export_table_to_point_in_time_kwargs,
+        }
+
+        args_filtered = {
+            key: value for key, value in 
export_table_to_point_in_time_args.items() if value is not None
+        }

Review Comment:
   ah nice! didn't know we have those.
   Made the change now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to