This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dff7e0de36 Revert "DynamoDBToS3Operator - Add feature to export table 
to a point in time (#30501)" (#31139)
dff7e0de36 is described below

commit dff7e0de362e4cd318d7c285ec102923503eceb3
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon May 8 15:38:52 2023 -0700

    Revert "DynamoDBToS3Operator - Add feature to export table to a point in 
time (#30501)" (#31139)
    
    This reverts commit fc4166127a1d2099d358fee1ea10662838cf9cf3.
---
 .../amazon/aws/transfers/dynamodb_to_s3.py         | 46 ++---------------
 airflow/providers/amazon/aws/waiters/README.md     |  2 -
 airflow/providers/amazon/aws/waiters/dynamodb.json | 30 ------------
 .../transfer/dynamodb_to_s3.rst                    |  8 ---
 .../amazon/aws/transfers/test_dynamodb_to_s3.py    | 41 ----------------
 .../amazon/aws/waiters/test_custom_waiters.py      | 57 ----------------------
 .../providers/amazon/aws/example_dynamodb_to_s3.py | 11 -----
 7 files changed, 3 insertions(+), 192 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 5588e5f4b0..bd2034893e 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,14 +23,12 @@ from __future__ import annotations
 
 import json
 from copy import copy
-from datetime import datetime
 from decimal import Decimal
 from os.path import getsize
 from tempfile import NamedTemporaryFile
 from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
 from uuid import uuid4
 
-from airflow.compat.functools import cached_property
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -89,10 +87,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
     :param dynamodb_scan_kwargs: kwargs pass to 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
     :param s3_key_prefix: Prefix of s3 object key
     :param process_func: How we transforms a dynamodb item to bytes. By 
default we dump the json
-    :param export_time: Time in the past from which to export table data, 
counted in seconds from the start of
-     the Unix epoch. The table export will be a snapshot of the table's state 
at this point in time.
-    :param export_format: The format for the exported data. Valid values for 
ExportFormat are DYNAMODB_JSON
-     or ION.
     """
 
     template_fields: Sequence[str] = (
@@ -115,8 +109,6 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         dynamodb_scan_kwargs: dict[str, Any] | None = None,
         s3_key_prefix: str = "",
         process_func: Callable[[dict[str, Any]], bytes] = 
_convert_item_to_json_bytes,
-        export_time: datetime | None = None,
-        export_format: str = "DYNAMODB_JSON",
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -126,43 +118,11 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         self.dynamodb_scan_kwargs = dynamodb_scan_kwargs
         self.s3_bucket_name = s3_bucket_name
         self.s3_key_prefix = s3_key_prefix
-        self.export_time = export_time
-        self.export_format = export_format
-
-        if self.export_time and self.export_time > datetime.now():
-            raise ValueError("The export_time parameter cannot be a future 
time.")
-
-    @cached_property
-    def hook(self):
-        """Create DynamoDBHook"""
-        return DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
 
     def execute(self, context: Context) -> None:
-        if self.export_time:
-            self._export_table_to_point_in_time()
-        else:
-            self._export_entire_data()
-
-    def _export_table_to_point_in_time(self):
-        """
-        Export data from start of epoc till `export_time`. Table export will 
be a snapshot of the table's
-         state at this point in time.
-        """
-        client = self.hook.conn.meta.client
-        response = client.export_table_to_point_in_time(
-            TableArn=self.dynamodb_table_name,
-            ExportTime=self.export_time,
-            S3Bucket=self.s3_bucket_name,
-            S3Prefix=self.s3_key_prefix,
-            ExportFormat=self.export_format,
-        )
-        waiter = self.hook.get_waiter("export_table")
-        export_arn = response.get("ExportDescription", {}).get("ExportArn")
-        waiter.wait(ExportArn=export_arn)
-
-    def _export_entire_data(self):
-        """Export all data from the table."""
-        table = self.hook.get_conn().Table(self.dynamodb_table_name)
+        hook = DynamoDBHook(aws_conn_id=self.source_aws_conn_id)
+        table = hook.get_conn().Table(self.dynamodb_table_name)
+
         scan_kwargs = copy(self.dynamodb_scan_kwargs) if 
self.dynamodb_scan_kwargs else {}
         err = None
         f: IO[Any]
diff --git a/airflow/providers/amazon/aws/waiters/README.md 
b/airflow/providers/amazon/aws/waiters/README.md
index d70b969c20..d6e8958b8e 100644
--- a/airflow/providers/amazon/aws/waiters/README.md
+++ b/airflow/providers/amazon/aws/waiters/README.md
@@ -98,5 +98,3 @@ 
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
 
 Note that since the get_waiter is in the hook instead of on the client side, a 
custom waiter is
 just `hook.get_waiter` and not `hook.conn.get_waiter`.  Other than that, they 
should be identical.
-
-Note the custom waiter doesn't work with resource_type, only client_type is 
supported.
diff --git a/airflow/providers/amazon/aws/waiters/dynamodb.json 
b/airflow/providers/amazon/aws/waiters/dynamodb.json
deleted file mode 100644
index acd23268ab..0000000000
--- a/airflow/providers/amazon/aws/waiters/dynamodb.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-    "version": 2,
-    "waiters": {
-        "export_table": {
-            "operation": "DescribeExport",
-            "delay": 30,
-            "maxAttempts": 60,
-            "acceptors": [
-                {
-                    "matcher": "path",
-                    "expected": "COMPLETED",
-                    "argument": "ExportDescription.ExportStatus",
-                    "state": "success"
-                },
-                {
-                    "matcher": "path",
-                    "expected": "FAILED",
-                    "argument": "ExportDescription.ExportStatus",
-                    "state": "failure"
-                },
-                {
-                    "matcher": "path",
-                    "expected": "IN_PROGRESS",
-                    "argument": "ExportDescription.ExportStatus",
-                    "state": "retry"
-                }
-            ]
-        }
-    }
-}
diff --git a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst 
b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
index 764e63f96d..508f3d4fbc 100644
--- a/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
+++ b/docs/apache-airflow-providers-amazon/transfer/dynamodb_to_s3.rst
@@ -63,14 +63,6 @@ To parallelize the replication, users can create multiple 
``DynamoDBToS3Operator
     :start-after: [START howto_transfer_dynamodb_to_s3_segmented]
     :end-before: [END howto_transfer_dynamodb_to_s3_segmented]
 
-Users can also pass in ``export_time`` param to ``DynamoDBToS3Operator`` to 
recover data from a point in time.
-
-.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-    :end-before: [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-
 Reference
 ---------
 
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 5bb02448f2..e5dd9f607f 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -316,44 +316,3 @@ class TestDynamodbToS3:
         assert "2020-01-01" == getattr(operator, "s3_bucket_name")
         assert "2020-01-01" == getattr(operator, "dynamodb_table_name")
         assert "2020-01-01" == getattr(operator, "s3_key_prefix")
-
-    
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator._export_entire_data")
-    def test_dynamodb_execute_calling_export_entire_data(self, 
_export_entire_data):
-        """Test that DynamoDBToS3Operator when called without export_time will 
call _export_entire_data"""
-        dynamodb_to_s3_operator = DynamoDBToS3Operator(
-            task_id="dynamodb_to_s3",
-            dynamodb_table_name="airflow_rocks",
-            s3_bucket_name="airflow-bucket",
-            file_size=4000,
-        )
-        dynamodb_to_s3_operator.execute(context={})
-        _export_entire_data.assert_called()
-
-    @patch(
-        
"airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator."
-        "_export_table_to_point_in_time"
-    )
-    def test_dynamodb_execute_calling_export_table_to_point_in_time(self, 
_export_table_to_point_in_time):
-        """Test that DynamoDBToS3Operator when called without export_time will 
call
-        _export_table_to_point_in_time. Which implements point in time 
recovery logic"""
-        dynamodb_to_s3_operator = DynamoDBToS3Operator(
-            task_id="dynamodb_to_s3",
-            dynamodb_table_name="airflow_rocks",
-            s3_bucket_name="airflow-bucket",
-            file_size=4000,
-            export_time=datetime(year=1983, month=1, day=1),
-        )
-        dynamodb_to_s3_operator.execute(context={})
-        _export_table_to_point_in_time.assert_called()
-
-    def test_dynamodb_with_future_date(self):
-        """Test that DynamoDBToS3Operator should raise a exception when future 
date is passed in
-        export_time parameter"""
-        with pytest.raises(ValueError, match="The export_time parameter cannot 
be a future time."):
-            DynamoDBToS3Operator(
-                task_id="dynamodb_to_s3",
-                dynamodb_table_name="airflow_rocks",
-                s3_bucket_name="airflow-bucket",
-                file_size=4000,
-                export_time=datetime(year=3000, month=1, day=1),
-            )
diff --git a/tests/providers/amazon/aws/waiters/test_custom_waiters.py 
b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
index 227ca6be42..5531b6a801 100644
--- a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
+++ b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
@@ -26,7 +26,6 @@ from botocore.exceptions import WaiterError
 from botocore.waiter import WaiterModel
 from moto import mock_eks
 
-from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
 from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook, 
EcsTaskDefinitionStates
 from airflow.providers.amazon.aws.hooks.eks import EksHook
 from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter
@@ -250,59 +249,3 @@ class TestCustomECSServiceWaiters:
         ]
         waiter = 
EcsHook(aws_conn_id=None).get_waiter("task_definition_inactive")
         waiter.wait(taskDefinition="spam-egg", WaiterConfig={"Delay": 0.01, 
"MaxAttempts": 3})
-
-
-class TestCustomDynamoDBServiceWaiters:
-    """Test waiters from ``amazon/aws/waiters/dynamodb.json``."""
-
-    STATUS_COMPLETED = "COMPLETED"
-    STATUS_FAILED = "FAILED"
-    STATUS_IN_PROGRESS = "IN_PROGRESS"
-
-    @pytest.fixture(autouse=True)
-    def setup_test_cases(self, monkeypatch):
-        self.client = boto3.client("dynamodb", region_name="eu-west-3")
-        monkeypatch.setattr(DynamoDBHook, "conn", self.client)
-
-    @pytest.fixture
-    def mock_describe_export(self):
-        """Mock ``DynamoDBHook.Client.describe_export`` method."""
-        with mock.patch.object(self.client, "describe_export") as m:
-            yield m
-
-    def test_service_waiters(self):
-        hook_waiters = DynamoDBHook(aws_conn_id=None).list_waiters()
-        assert "export_table" in hook_waiters
-
-    @staticmethod
-    def describe_export(status: str):
-        """
-        Helper function for generate minimal DescribeExport response for 
single job.
-        
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DescribeExport.html
-        """
-        return {"ExportDescription": {"ExportStatus": status}}
-
-    def test_export_table_to_point_in_time_completed(self, 
mock_describe_export):
-        """Test state transition from `in progress` to `completed` during 
init."""
-        with mock.patch("boto3.client") as client:
-            client.return_value = self.client
-            waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table", 
client=self.client)
-            mock_describe_export.side_effect = [
-                self.describe_export(self.STATUS_IN_PROGRESS),
-                self.describe_export(self.STATUS_COMPLETED),
-            ]
-            waiter.wait(
-                
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
-            )
-
-    def test_export_table_to_point_in_time_failed(self, mock_describe_export):
-        """Test state transition from `in progress` to `failed` during init."""
-        with mock.patch("boto3.client") as client:
-            client.return_value = self.client
-            mock_describe_export.side_effect = [
-                self.describe_export(self.STATUS_IN_PROGRESS),
-                self.describe_export(self.STATUS_FAILED),
-            ]
-            waiter = DynamoDBHook(aws_conn_id=None).get_waiter("export_table", 
client=self.client)
-            with pytest.raises(WaiterError, match='we matched expected path: 
"FAILED"'):
-                
waiter.wait(ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry")
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index a90cba6ba0..b56efaf2ce 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -127,16 +127,6 @@ with DAG(
         },
     )
     # [END howto_transfer_dynamodb_to_s3_segmented]
-
-    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-    backup_db_to_point_in_time = DynamoDBToS3Operator(
-        task_id="backup_db_to_point_in_time",
-        dynamodb_table_name=table_name,
-        s3_bucket_name=bucket_name,
-        export_time=datetime(year=2023, month=4, day=10),
-    )
-    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
-
     delete_table = delete_dynamodb_table(table_name=table_name)
 
     delete_bucket = S3DeleteBucketOperator(
@@ -156,7 +146,6 @@ with DAG(
         backup_db,
         backup_db_segment_1,
         backup_db_segment_2,
-        backup_db_to_point_in_time,
         # TEST TEARDOWN
         delete_table,
         delete_bucket,

Reply via email to