This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dff7e0de36 Revert "DynamoDBToS3Operator - Add feature to export table
to a point in time (#30501)" (#31139)
dff7e0de36 is described below
commit dff7e0de362e4cd318d7c285ec102923503eceb3
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon May 8 15:38:52 2023 -0700
Revert "DynamoDBToS3Operator - Add feature to export table to a point in
time (#30501)" (#31139)
This reverts commit fc4166127a1d2099d358fee1ea10662838cf9cf3.
---
.../amazon/aws/transfers/dynamodb_to_s3.py | 46 ++---------------
airflow/providers/amazon/aws/waiters/README.md | 2 -
airflow/providers/amazon/aws/waiters/dynamodb.json | 30 ------------
.../transfer/dynamodb_to_s3.rst | 8 ---
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 41 ----------------
.../amazon/aws/waiters/test_custom_waiters.py | 57 ----------------------
.../providers/amazon/aws/example_dynamodb_to_s3.py | 11 -----
7 files changed, 3 insertions(+), 192 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 5588e5f4b0..bd2034893e 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,14 +23,12 @@ from __future__ import annotations
import json
from copy import copy
-from datetime import datetime
from decimal import Decimal
from os.path import getsize
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
from uuid import uuid4
-from airflow.compat.functools import cached_property
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -89,10 +87,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By
default we dump the json
- :param export_time: Time in the past from which to export table data,
counted in seconds from the start of
- the Unix epoch. The table export will be a snapshot of the table's state
at this point in time.
- :param export_format: The format for the exported data. Valid values for
ExportFormat are DYNAMODB_JSON
- or ION.
"""
template_fields: Sequence[str] = (
@@ -115,8 +109,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
dynamodb_scan_kwargs: dict[str, Any] | None = None,
s3_key_prefix: str = "",
process_func: Callable[[dict[str, Any]], bytes] =
_convert_item_to_json_bytes,
- export_time: datetime | None = None,
- export_format: str = "DYNAMODB_JSON",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -126,43 +118,11 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
self.s3_bucket_name = s3_bucket_name
self.s3_key_prefix = s3_key_prefix
- self.export_time = export_time
- self.export_format = export_format
-
- if self.export_time and self.export_time > datetime.now():
- raise ValueError("The export_time parameter cannot be a future
time.")
-
- @cached_property
- def hook(self):
- """Create DynamoDBHook"""
- return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
def execute(self, context: Context) -> None:
- if self.export_time:
- self._export_table_to_point_in_time()
- else:
- self._export_entire_data()
-
- def _export_table_to_point_in_time(self):
- """
- Export data from start of epoc till `export_time`. Table export will
be a snapshot of the table's
- state at this point in time.
- """
- client = self.hook.conn.meta.client
- response = client.export_table_to_point_in_time(
- TableArn=self.dynamodb_table_name,
- ExportTime=self.export_time,
- S3Bucket=self.s3_bucket_name,
- S3Prefix=self.s3_key_prefix,
- ExportFormat=self.export_format,
- )
- waiter = self.hook.get_waiter("export_table")
- export_arn = response.get("ExportDescription", {}).get("ExportArn")
- waiter.wait(ExportArn=export_arn)
-
- def _export_entire_data(self):
- """Export all data from the table."""
- table = self.hook.get_conn().Table(self.dynamodb_table_name)
+ hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+ table = hook.get_conn().Table(self.dynamodb_table_name)
+
scan_kwargs = copy(self.dynamodb_scan_kwargs) if
self.dynamodb_scan_kwargs else {}
err = None
f: IO[Any]
diff --git a/airflow/providers/amazon/aws/waiters/README.md
b/airflow/providers/amazon/aws/waiters/README.md
index d70b969c20..d6e8958b8e 100644
--- a/airflow/providers/amazon/aws/waiters/README.md
+++ b/airflow/providers/amazon/aws/waiters/README.md
@@ -98,5 +98,3 @@
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
Note that since the get_waiter is in the hook instead of on the client side, a
custom waiter is
just `hook.get_waiter` and not `hook.conn.get_waiter`. Other than that, they
should be identical.
-
-Note the custom waiter doesn't work with resource_type, only client_type is
supported.
diff --git a/airflow/providers/amazon/aws/waiters/dynamodb.json
b/airflow/providers/amazon/aws/waiters/dynamodb.json
deleted file mode 100644
index acd23268ab..0000000000
--- a/airflow/providers/amazon/aws/waiters/dynamodb.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "version": 2,
- "waiters": {
- "export_table": {
- "operation": "DescribeExport",
- "delay": 30,
- "maxAttempts": 60,
- "acceptors": [
- {
- "matcher": "path",
- "expected": "COMPLETED",
- "argument": "ExportDescription.ExportStatus",
- "state": "success"
- },
- {
- "matcher": "path",
- "expected": "FAILED",
- "argument": "ExportDescription.ExportStatus",
- "state": "failure"
- },
- {
- "matcher": "path",
- "expected": "IN_PROGRESS",
- "argument": "ExportDescription.ExportStatus",
- "state": "retry"
- }
- ]
- }
- }
-}
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 764e63f96d..508f3d4fbc 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,14 +63,6 @@ To parallelize the replication, users can create multiple
``DynamoDBToS3Operator
:start-after: [START howto_transfer_dynamodb_to_s3_segmented]
:end-before: [END howto_transfer_dynamodb_to_s3_segmented]
-Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to
recover data from a point in time.
-
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
- :language: python
- :dedent: 4
- :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
- :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-
Reference
---------
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 5bb02448f2..e5dd9f607f 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -316,44 +316,3 @@ class TestDynamodbToS3:
assert "2020-01-01" == getattr(operator, "s3_bucket_name")
assert "2020-01-01" == getattr(operator, "dynamodb_table_name")
assert "2020-01-01" == getattr(operator, "s3_key_prefix")
-
-
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator._export_entire_data")
- def test_dynamodb_execute_calling_export_entire_data(self,
_export_entire_data):
- """Test that DynamoDBToS3Operator when called without export_time will
call _export_entire_data"""
- dynamodb_to_s3_operator = DynamoDBToS3Operator(
- task_id="dynamodb_to_s3",
- dynamodb_table_name="airflow_rocks",
- s3_bucket_name="airflow-bucket",
- file_size=4000,
- )
- dynamodb_to_s3_operator.execute(context={})
- _export_entire_data.assert_called()
-
- @patch(
-
"airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator."
- "_export_table_to_point_in_time"
- )
- def test_dynamodb_execute_calling_export_table_to_point_in_time(self,
_export_table_to_point_in_time):
- """Test that DynamoDBToS3Operator when called without export_time will
call
- _export_table_to_point_in_time. Which implements point in time
recovery logic"""
- dynamodb_to_s3_operator = DynamoDBToS3Operator(
- task_id="dynamodb_to_s3",
- dynamodb_table_name="airflow_rocks",
- s3_bucket_name="airflow-bucket",
- file_size=4000,
- export_time=datetime(year=1983, month=1, day=1),
- )
- dynamodb_to_s3_operator.execute(context={})
- _export_table_to_point_in_time.assert_called()
-
- def test_dynamodb_with_future_date(self):
- """Test that DynamoDBToS3Operator should raise a exception when future
date is passed in
- export_time parameter"""
- with pytest.raises(ValueError, match="The export_time parameter cannot
be a future time."):
- DynamoDBToS3Operator(
- task_id="dynamodb_to_s3",
- dynamodb_table_name="airflow_rocks",
- s3_bucket_name="airflow-bucket",
- file_size=4000,
- export_time=datetime(year=3000, month=1, day=1),
- )
diff --git a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
index 227ca6be42..5531b6a801 100644
--- a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
+++ b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
@@ -26,7 +26,6 @@ from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from moto import mock_eks
-from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook,
EcsTaskDefinitionStates
from airflow.providers.amazon.aws.hooks.eks import EksHook
from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter
@@ -250,59 +249,3 @@ class TestCustomECSServiceWaiters:
]
waiter =
EcsHook(aws_conn_id=None).get_waiter("task_definition_inactive")
waiter.wait(taskDefinition="spam-egg", WaiterConfig={"Delay": 0.01,
"MaxAttempts": 3})
-
-
-class TestCustomDynamoDBServiceWaiters:
- """Test waiters from ``amazon/aws/waiters/dynamodb.json``."""
-
- STATUS_COMPLETED = "COMPLETED"
- STATUS_FAILED = "FAILED"
- STATUS_IN_PROGRESS = "IN_PROGRESS"
-
- @pytest.fixture(autouse=True)
- def setup_test_cases(self, monkeypatch):
- self.client = boto3.client("dynamodb", region_name="eu-west-3")
- monkeypatch.setattr(DynamoDBHook, "conn", self.client)
-
- @pytest.fixture
- def mock_describe_export(self):
- """Mock ``DynamoDBHook.Client.describe_export`` method."""
- with mock.patch.object(self.client, "describe_export") as m:
- yield m
-
- def test_service_waiters(self):
- hook_waiters = DynamoDBHook(aws_conn_id=None).list_waiters()
- assert "export_table" in hook_waiters
-
- @staticmethod
- def describe_export(status: str):
- """
- Helper function for generate minimal DescribeExport response for
single job.
-
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeExport.html
- """
- return {"ExportDescription": {"ExportStatus": status}}
-
- def test_export_table_to_point_in_time_completed(self,
mock_describe_export):
- """Test state transition from `in progress` to `completed` during
init."""
- with mock.patch("boto3.client") as client:
- client.return_value = self.client
- waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table",
client=self.client)
- mock_describe_export.side_effect = [
- self.describe_export(self.STATUS_IN_PROGRESS),
- self.describe_export(self.STATUS_COMPLETED),
- ]
- waiter.wait(
-
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
- )
-
- def test_export_table_to_point_in_time_failed(self, mock_describe_export):
- """Test state transition from `in progress` to `failed` during init."""
- with mock.patch("boto3.client") as client:
- client.return_value = self.client
- mock_describe_export.side_effect = [
- self.describe_export(self.STATUS_IN_PROGRESS),
- self.describe_export(self.STATUS_FAILED),
- ]
- waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table",
client=self.client)
- with pytest.raises(WaiterError, match='we matched expected path:
"FAILED"'):
-
waiter.wait(ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry")
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index a90cba6ba0..b56efaf2ce 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -127,16 +127,6 @@ with DAG(
},
)
# [END howto_transfer_dynamodb_to_s3_segmented]
-
- # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
- backup_db_to_point_in_time = DynamoDBToS3Operator(
- task_id="backup_db_to_point_in_time",
- dynamodb_table_name=table_name,
- s3_bucket_name=bucket_name,
- export_time=datetime(year=2023, month=4, day=10),
- )
- # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-
delete_table = delete_dynamodb_table(table_name=table_name)
delete_bucket = S3DeleteBucketOperator(
@@ -156,7 +146,6 @@ with DAG(
backup_db,
backup_db_segment_1,
backup_db_segment_2,
- backup_db_to_point_in_time,
# TEST TEARDOWN
delete_table,
delete_bucket,