This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4a37777567 Glue `DataBrew` operator (#34807)
4a37777567 is described below
commit 4a377775672b7148e8935e20844e7a0ba491bdd8
Author: ellisms <[email protected]>
AuthorDate: Mon Oct 16 16:29:05 2023 -0400
Glue `DataBrew` operator (#34807)
---
.../providers/amazon/aws/hooks/glue_databrew.py | 68 +++++++++++++
.../amazon/aws/operators/glue_databrew.py | 110 +++++++++++++++++++++
.../providers/amazon/aws/triggers/glue_databrew.py | 59 +++++++++++
airflow/providers/amazon/aws/waiters/databrew.json | 36 +++++++
airflow/providers/amazon/provider.yaml | 15 +++
.../operators/glue.rst | 1 +
.../operators/glue_databrew.rst | 53 ++++++++++
.../integration-logos/aws/AWS-Glue-DataBrew_64.png | Bin 0 -> 14575 bytes
.../amazon/aws/hooks/test_glue_databrew.py | 38 +++++++
.../amazon/aws/operators/test_glue_databrew.py | 58 +++++++++++
.../amazon/aws/triggers/test_glue_databrew.py | 46 +++++++++
.../amazon/aws/waiters/test_glue_databrew.py | 68 +++++++++++++
.../providers/amazon/aws/example_glue_databrew.py | 55 +++++++++++
13 files changed, 607 insertions(+)
diff --git a/airflow/providers/amazon/aws/hooks/glue_databrew.py
b/airflow/providers/amazon/aws/hooks/glue_databrew.py
new file mode 100644
index 0000000000..f6c7b3ebd6
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/glue_databrew.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class GlueDataBrewHook(AwsBaseHook):
+ """
+ Interact with AWS DataBrew.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ - :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ def __init__(self, *args, **kwargs):
+ kwargs["client_type"] = "databrew"
+ super().__init__(*args, **kwargs)
+
+ def job_completion(self, job_name: str, run_id: str, delay: int = 10,
max_attempts: int = 60) -> str:
+ """
+ Wait until Glue DataBrew job reaches terminal status.
+
+ :param job_name: The name of the job being processed during this run.
+ :param run_id: The unique identifier of the job run.
+ :param delay: Time in seconds to delay between polls
+ :param maxAttempts: Maximum number of attempts to poll for completion
+ :return: job status
+ """
+ self.get_waiter("job_complete").wait(
+ Name=job_name,
+ RunId=run_id,
+ WaiterConfig={"Delay": delay, "maxAttempts": max_attempts},
+ )
+
+ status = self.get_job_state(job_name, run_id)
+ return status
+
+ def get_job_state(self, job_name: str, run_id: str) -> str:
+ """
+ Get the status of a job run.
+
+ :param job_name: The name of the job being processed during this run.
+ :param run_id: The unique identifier of the job run.
+ :return: State of the job run.
+
'STARTING'|'RUNNING'|'STOPPING'|'STOPPED'|'SUCCEEDED'|'FAILED'|'TIMEOUT'
+ """
+ response = self.conn.describe_job_run(Name=job_name, RunId=run_id)
+ return response["State"]
diff --git a/airflow/providers/amazon/aws/operators/glue_databrew.py
b/airflow/providers/amazon/aws/operators/glue_databrew.py
new file mode 100644
index 0000000000..596a507397
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/glue_databrew.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from airflow.configuration import conf
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.triggers.glue_databrew import
GlueDataBrewJobCompleteTrigger
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class GlueDataBrewStartJobOperator(BaseOperator):
+ """
+ Start an AWS Glue DataBrew job.
+
+ AWS Glue DataBrew is a visual data preparation tool that makes it easier
+ for data analysts and data scientists to clean and normalize data
+ to prepare it for analytics and machine learning (ML).
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueDataBrewStartJobOperator`
+
+ :param job_name: unique job name per AWS Account
+ :param wait_for_completion: Whether to wait for job run completion.
(default: True)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to complete.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+ :param delay: Time in seconds to wait between status checks. Default is 30.
+ :return: dictionary with key run_id and value of the resulting job's
run_id.
+ """
+
+ template_fields: Sequence[str] = (
+ "job_name",
+ "wait_for_completion",
+ "delay",
+ "deferrable",
+ )
+
+ def __init__(
+ self,
+ job_name: str,
+ wait_for_completion: bool = True,
+ delay: int = 30,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ aws_conn_id: str = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.job_name = job_name
+ self.wait_for_completion = wait_for_completion
+ self.deferrable = deferrable
+ self.delay = delay
+ self.aws_conn_id = aws_conn_id
+
+ @cached_property
+ def hook(self) -> GlueDataBrewHook:
+ return GlueDataBrewHook(aws_conn_id=self.aws_conn_id)
+
+ def execute(self, context: Context):
+ job = self.hook.conn.start_job_run(Name=self.job_name)
+ run_id = job["RunId"]
+
+ self.log.info("AWS Glue DataBrew Job: %s. Run Id: %s submitted.",
self.job_name, run_id)
+
+ if self.deferrable:
+ self.log.info("Deferring job %s with run_id %s", self.job_name,
run_id)
+ self.defer(
+ trigger=GlueDataBrewJobCompleteTrigger(
+ aws_conn_id=self.aws_conn_id, job_name=self.job_name,
run_id=run_id, delay=self.delay
+ ),
+ method_name="execute_complete",
+ )
+
+ elif self.wait_for_completion:
+ self.log.info(
+ "Waiting for AWS Glue DataBrew Job: %s. Run Id: %s to
complete.", self.job_name, run_id
+ )
+ status = self.hook.job_completion(job_name=self.job_name,
delay=self.delay, run_id=run_id)
+ self.log.info("Glue DataBrew Job: %s status: %s", self.job_name,
status)
+
+ return {"run_id": run_id}
+
+ def execute_complete(self, context: Context, event=None) -> dict[str, str]:
+ run_id = event.get("run_id", "")
+ status = event.get("status", "")
+
+ self.log.info("AWS Glue DataBrew runID: %s completed with status: %s",
run_id, status)
+
+ return {"run_id": run_id}
diff --git a/airflow/providers/amazon/aws/triggers/glue_databrew.py
b/airflow/providers/amazon/aws/triggers/glue_databrew.py
new file mode 100644
index 0000000000..595b653e2f
--- /dev/null
+++ b/airflow/providers/amazon/aws/triggers/glue_databrew.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+
+
+class GlueDataBrewJobCompleteTrigger(AwsBaseWaiterTrigger):
+ """
+ Watches for a Glue DataBrew job, triggers when it finishes.
+
+ :param job_name: Glue DataBrew job name
+ :param run_id: the ID of the specific run to watch for that job
+ :param delay: Number of seconds to wait between two checks. Default is 10
seconds.
+ :param max_attempts: Maximum number of attempts to wait for the job to
complete. Default is 60 attempts.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ job_name: str,
+ run_id: str,
+ aws_conn_id: str,
+ delay: int = 10,
+ max_attempts: int = 60,
+ **kwargs,
+ ):
+ super().__init__(
+ serialized_fields={"job_name": job_name, "run_id": run_id},
+ waiter_name="job_complete",
+ waiter_args={"Name": job_name, "RunId": run_id},
+ failure_message=f"Error while waiting for job {job_name} with run
id {run_id} to complete",
+ status_message=f"Run id: {run_id}",
+ status_queries=["State"],
+ return_value=run_id,
+ return_key="run_id",
+ waiter_delay=delay,
+ waiter_max_attempts=max_attempts,
+ aws_conn_id=aws_conn_id,
+ )
+
+ def hook(self) -> GlueDataBrewHook:
+ return GlueDataBrewHook(aws_conn_id=self.aws_conn_id)
diff --git a/airflow/providers/amazon/aws/waiters/databrew.json
b/airflow/providers/amazon/aws/waiters/databrew.json
new file mode 100644
index 0000000000..41372def5b
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/databrew.json
@@ -0,0 +1,36 @@
+{
+ "version": 2,
+ "waiters": {
+ "job_complete": {
+ "operation": "DescribeJobRun",
+ "delay": 30,
+ "maxAttempts": 60,
+ "acceptors": [
+ {
+ "matcher": "path",
+ "argument": "State",
+ "expected": "STOPPED",
+ "state": "success"
+ },
+ {
+ "matcher": "path",
+ "argument": "State",
+ "expected": "SUCCEEDED",
+ "state": "success"
+ },
+ {
+ "matcher": "path",
+ "argument": "State",
+ "expected": "FAILED",
+ "state": "success"
+ },
+ {
+ "matcher": "path",
+ "argument": "State",
+ "expected": "TIMEOUT",
+ "state": "success"
+ }
+ ]
+ }
+ }
+}
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 74ada23355..0949232bd2 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -289,6 +289,12 @@ integrations:
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/appflow.rst
tags: [aws]
+ - integration-name: AWS Glue DataBrew
+ external-doc-url:
https://docs.aws.amazon.com/databrew/latest/dg/what-is.html
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
+ logo: /integration-logos/aws/AWS-Glue-DataBrew_64.png
+ tags: [aws]
operators:
- integration-name: Amazon Athena
@@ -365,6 +371,9 @@ operators:
- integration-name: Amazon Appflow
python-modules:
- airflow.providers.amazon.aws.operators.appflow
+ - integration-name: AWS Glue DataBrew
+ python-modules:
+ - airflow.providers.amazon.aws.operators.glue_databrew
sensors:
- integration-name: Amazon Athena
@@ -541,6 +550,9 @@ hooks:
- integration-name: Amazon Appflow
python-modules:
- airflow.providers.amazon.aws.hooks.appflow
+ - integration-name: AWS Glue DataBrew
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.glue_databrew
triggers:
- integration-name: Amazon Web Services
@@ -589,6 +601,9 @@ triggers:
- integration-name: Amazon Simple Queue Service (SQS)
python-modules:
- airflow.providers.amazon.aws.triggers.sqs
+ - integration-name: AWS Glue DataBrew
+ python-modules:
+ - airflow.providers.amazon.aws.triggers.glue_databrew
transfers:
- source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst
b/docs/apache-airflow-providers-amazon/operators/glue.rst
index e582e9d415..ddd21205c1 100644
--- a/docs/apache-airflow-providers-amazon/operators/glue.rst
+++ b/docs/apache-airflow-providers-amazon/operators/glue.rst
@@ -105,3 +105,4 @@ Reference
* `AWS boto3 library documentation for Glue
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html>`__
* `Glue IAM Role creation
<https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html>`__
+* `AWS boto3 library documentation for Glue DataBrew
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/databrew.html>`__
diff --git a/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
b/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
new file mode 100644
index 0000000000..d14f898486
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/glue_databrew.rst
@@ -0,0 +1,53 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+=================
+AWS Glue DataBrew
+=================
+
+`AWS Glue DataBrew <https://aws.amazon.com/glue/features/databrew/>`__ is a
visual data preparation tool
+that makes it easier for data analysts and data scientists to clean and
normalize data to prepare it
+for analytics and machine learning (ML). You can choose from over 250 prebuilt
transformations to automate
+data preparation tasks, all without the need to write any code. You can
automate filtering anomalies, converting
+data to standard formats and correcting invalid values, and other tasks.
+After your data is ready, you can immediately use it for analytics and ML
projects.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+.. _howto/operator:GlueDataBrewStartJobOperator:
+
+Start an AWS Glue DataBrew job
+==============================
+
+To submit a new AWS Glue DataBrew job you can use
:class:`~airflow.providers.amazon.aws.operators.glue_databrew.GlueDataBrewStartJobOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_glue_databrew.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_glue_databrew_start]
+ :end-before: [END howto_operator_glue_databrew_start]
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Glue DataBrew
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/databrew.html>`__
diff --git a/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png
b/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png
new file mode 100644
index 0000000000..9821fcdf26
Binary files /dev/null and
b/docs/integration-logos/aws/AWS-Glue-DataBrew_64.png differ
diff --git a/tests/providers/amazon/aws/hooks/test_glue_databrew.py
b/tests/providers/amazon/aws/hooks/test_glue_databrew.py
new file mode 100644
index 0000000000..e3ea047f90
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_glue_databrew.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+
+if TYPE_CHECKING:
+ from unittest.mock import MagicMock
+
+
+class TestGlueDataBrewHook:
+ job_name = "test-databrew-job"
+ runId = "test12345"
+
+ @mock.patch.object(GlueDataBrewHook, "get_job_state")
+ def test_get_job_state(self, get_job_state_mock: MagicMock):
+ get_job_state_mock.return_value = "SUCCEEDED"
+ hook = GlueDataBrewHook()
+ result = hook.get_job_state(self.job_name, self.runId)
+ assert result == "SUCCEEDED"
diff --git a/tests/providers/amazon/aws/operators/test_glue_databrew.py
b/tests/providers/amazon/aws/operators/test_glue_databrew.py
new file mode 100644
index 0000000000..571e4816b5
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_glue_databrew.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+from moto import mock_databrew
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+from airflow.providers.amazon.aws.operators.glue_databrew import
GlueDataBrewStartJobOperator
+
+JOB_NAME = "test_job"
+
+
[email protected]
+def hook() -> GlueDataBrewHook:
+ with mock_databrew():
+ yield GlueDataBrewHook(aws_conn_id="aws_default")
+
+
+class TestGlueDataBrewOperator:
+ @mock.patch.object(GlueDataBrewHook, "conn")
+ @mock.patch.object(GlueDataBrewHook, "get_waiter")
+ def test_start_job_wait_for_completion(self, mock_hook_get_waiter,
mock_conn):
+ TEST_RUN_ID = "12345"
+ operator = GlueDataBrewStartJobOperator(
+ task_id="task_test", job_name=JOB_NAME, wait_for_completion=True,
aws_conn_id="aws_default"
+ )
+ mock_conn.start_job_run(mock.MagicMock(), return_value=TEST_RUN_ID)
+ operator.execute(None)
+ mock_hook_get_waiter.assert_called_once_with("job_complete")
+
+ @mock.patch.object(GlueDataBrewHook, "conn")
+ @mock.patch.object(GlueDataBrewHook, "get_waiter")
+ def test_start_job_no_wait(self, mock_hook_get_waiter, mock_conn):
+ TEST_RUN_ID = "12345"
+ operator = GlueDataBrewStartJobOperator(
+ task_id="task_test", job_name=JOB_NAME, wait_for_completion=False,
aws_conn_id="aws_default"
+ )
+ mock_conn.start_job_run(mock.MagicMock(), return_value=TEST_RUN_ID)
+ operator.execute(None)
+ mock_hook_get_waiter.assert_not_called()
diff --git a/tests/providers/amazon/aws/triggers/test_glue_databrew.py
b/tests/providers/amazon/aws/triggers/test_glue_databrew.py
new file mode 100644
index 0000000000..7352fffcd8
--- /dev/null
+++ b/tests/providers/amazon/aws/triggers/test_glue_databrew.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.triggers.glue_databrew import
GlueDataBrewJobCompleteTrigger
+
+TEST_JOB_NAME = "test_job_name"
+TEST_JOB_RUN_ID = "a1234"
+TEST_JOB_RUN_STATUS = "SUCCEEDED"
+
+
[email protected]
+def trigger():
+ yield GlueDataBrewJobCompleteTrigger(
+ aws_conn_id="aws_default", job_name=TEST_JOB_NAME,
run_id=TEST_JOB_RUN_ID
+ )
+
+
+class TestGlueDataBrewJobCompleteTrigger:
+ def test_serialize(self, trigger):
+ class_path, args = trigger.serialize()
+
+ class_name = class_path.split(".")[-1]
+ clazz = globals()[class_name]
+ instance = clazz(**args)
+
+ class_path2, args2 = instance.serialize()
+
+ assert class_path == class_path2
+ assert args == args2
diff --git a/tests/providers/amazon/aws/waiters/test_glue_databrew.py
b/tests/providers/amazon/aws/waiters/test_glue_databrew.py
new file mode 100644
index 0000000000..2393898102
--- /dev/null
+++ b/tests/providers/amazon/aws/waiters/test_glue_databrew.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import pytest
+
+from airflow.providers.amazon.aws.hooks.glue_databrew import GlueDataBrewHook
+
+RUNNING_STATES = ["STARTING", "RUNNING", "STOPPING"]
+TERMINAL_STATES = ["STOPPED", "SUCCEEDED", "FAILED"]
+
+
+class TestCustomDataBrewWaiters:
+ """Test waiters from ``amazon/aws/waiters/glue.json``."""
+
+ JOB_NAME = "test_job"
+ RUN_ID = "123"
+
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self, monkeypatch):
+ self.client = boto3.client("databrew", region_name="eu-west-3")
+ monkeypatch.setattr(GlueDataBrewHook, "conn", self.client)
+
+ def test_service_waiters(self):
+ hook_waiters = GlueDataBrewHook(aws_conn_id=None).list_waiters()
+ assert "job_complete" in hook_waiters
+
+ @pytest.fixture
+ def mock_describe_job_runs(self):
+ """Mock ``GlueDataBrewHook.Client.describe_job_run`` method."""
+ with mock.patch.object(self.client, "describe_job_run") as m:
+ yield m
+
+ @staticmethod
+ def describe_jobs(status: str):
+ """
+ Helper function for generate minimal DescribeJobRun response for a
single job.
+
+ https://docs.aws.amazon.com/databrew/latest/dg/API_DescribeJobRun.html
+ """
+ return {"State": status}
+
+ def test_job_succeeded(self, mock_describe_job_runs):
+ """Test job succeeded"""
+ mock_describe_job_runs.side_effect = [
+ self.describe_jobs(RUNNING_STATES[1]),
+ self.describe_jobs(TERMINAL_STATES[1]),
+ ]
+ waiter = GlueDataBrewHook(aws_conn_id=None).get_waiter("job_complete")
+ waiter.wait(name=self.JOB_NAME, runId=self.RUN_ID,
WaiterConfig={"Delay": 0.2, "MaxAttempts": 2})
diff --git a/tests/system/providers/amazon/aws/example_glue_databrew.py
b/tests/system/providers/amazon/aws/example_glue_databrew.py
new file mode 100644
index 0000000000..08625b5611
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_glue_databrew.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+
+from airflow.models.baseoperator import chain
+from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.operators.glue_databrew import (
+ GlueDataBrewStartJobOperator,
+)
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+DAG_ID = "example_databrew"
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+with DAG(DAG_ID, schedule="@once", start_date=pendulum.datetime(2023, 1, 1,
tz="UTC"), catchup=False) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context["ENV_ID"]
+
+ job_name = f"{env_id}-databrew-job"
+
+ # [START howto_operator_glue_databrew_start]
+ start_job = GlueDataBrewStartJobOperator(task_id="startjob",
deferrable=True, job_name=job_name, delay=15)
+ # [END howto_operator_glue_databrew_start]
+
+ chain(test_context, start_job)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)