ferruzzi commented on code in PR #41304:
URL: https://github.com/apache/airflow/pull/41304#discussion_r1719085240
##########
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py:
##########
@@ -137,8 +145,10 @@ def __init__(
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
+ self.point_in_time_export = point_in_time_export
self.export_time = export_time
self.export_format = export_format
+ self.export_table_to_point_in_time_kwargs =
export_table_to_point_in_time_kwargs
Review Comment:
this needs to be
```
self.export_table_to_point_in_time_kwargs =
export_table_to_point_in_time_kwargs or {}
```
otherwise if None is passed in, Line 199 fails because None can't be
unpacked.
##########
tests/system/providers/amazon/aws/example_dynamodb_to_s3.py:
##########
@@ -162,18 +171,39 @@ def delete_dynamodb_table(table_name: str):
# [END howto_transfer_dynamodb_to_s3_segmented]
export_time = get_export_time(table_name)
- # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
- backup_db_to_point_in_time = DynamoDBToS3Operator(
- task_id="backup_db_to_point_in_time",
+ latest_export_time = get_latest_export_time(table_name)
+ # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+ backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
- file_size=1000,
s3_bucket_name=bucket_name,
+ point_in_time_export=True,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
- # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+ # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+
+ # [START
howto_transfer_dynamodb_to_s3_in_some_point_in_time_incremental_export]
+ backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_incremental_export",
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ point_in_time_export=True,
+ s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
+ export_table_to_point_in_time_kwargs={
+ "ExportType": "INCREMENTAL_EXPORT",
+ "IncrementalExportSpecification": {
+ "ExportFromTime": export_time,
+ "ExportToTime": latest_export_time,
Review Comment:
This is an issue I have run out of time for today. with the other two fixes
applied locally, the test keeps failing and I've tried a number of solutions.
Here's what I've found so far:
error: "from" time can't be equal to "to" time
tried: so I set "from" to `export_time - 1_minute`
error: "from" can't be less than table creation time
tried: revert the above and set "to" to be `latest_export_time + 1_minute`
error: Difference between "from" time and "to" is less than 15 minutes
tried: revert the above and set "to" to be `latest_export_time + 16_minutes`
error: "to" time cannot be greater than the current time
tried: revert the above and set "to" time to utcnow()
error: this gets set at parse time and "to" time is now in the past
We will either need to figure out a way to make this happy or set that task
to not run using an unreachable trigger_rule or a branch operator to skip it.
##########
tests/system/providers/amazon/aws/example_dynamodb_to_s3.py:
##########
@@ -195,7 +225,8 @@ def delete_dynamodb_table(table_name: str):
backup_db_segment_1,
backup_db_segment_2,
export_time,
- backup_db_to_point_in_time,
+ backup_db_to_point_in_time_full_export,
Review Comment:
Insert `latest_export_time` between these two lines
```
export_time,
latest_export_time
backup_db_to_point_in_time_full_export,
```
Without it listed in the chain(), it fires off at the very beginning and
causes the test to fail because env_id isn't populated yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]