This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e9b05f9f649 Handle CrawlerRunningException gracefully in
GlueCrawlerOperator (#62016)
e9b05f9f649 is described below
commit e9b05f9f6494ad7d786b8b219b82bfd37b5ba01f
Author: bahram-cdt <[email protected]>
AuthorDate: Tue Feb 17 17:29:51 2026 +0100
Handle CrawlerRunningException gracefully in GlueCrawlerOperator (#62016)
* Handle CrawlerRunningException in GlueCrawlerOperator
When start_crawler() or update_crawler() is called while the crawler is
already running (e.g., from a retry, overlapping DAG run, or boto3 internal
retry after a timeout), the Glue API raises CrawlerRunningException. Previously
this propagated as an unhandled error, causing Airflow task failure despite the
crawler actually succeeding.
This change catches CrawlerRunningException on both update_crawler() and
start_crawler() calls, logs a warning, and waits for the existing run to
complete instead of failing.
* Update
providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
Co-authored-by: Vincent <[email protected]>
---------
Co-authored-by: Vincent <[email protected]>
---
.../providers/amazon/aws/operators/glue_crawler.py | 38 ++++++-
.../unit/amazon/aws/operators/test_glue_crawler.py | 111 +++++++++++++++++++++
2 files changed, 147 insertions(+), 2 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
index 95d154095b5..7c8e0d54d5b 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -20,6 +20,8 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
+from botocore.exceptions import ClientError
+
from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.glue_crawler import
GlueCrawlerCompleteTrigger
@@ -49,6 +51,12 @@ class GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
:param deferrable: If True, the operator will wait asynchronously for the
crawl to complete.
This implies waiting for completion. This mode requires aiobotocore
module to be installed.
(default: False)
+ :param fail_on_already_running: If True (default), the operator will raise
an exception when
+ ``start_crawler()`` or ``update_crawler()`` encounters a
``CrawlerRunningException``
+ (i.e., the crawler is already running). If False, the operator logs a
warning
+ and waits for the existing run to complete. Setting this to False is
useful for handling
+ retry-induced race conditions where boto3 retries trigger a second
``start_crawler()``
+ call after a network timeout on the first (successful) call. (default:
True)
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used.
If
running Airflow in a distributed manner and aws_conn_id is None or
@@ -74,12 +82,14 @@ class GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
poll_interval: int = 5,
wait_for_completion: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ fail_on_already_running: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.poll_interval = poll_interval
self.wait_for_completion = wait_for_completion
self.deferrable = deferrable
+ self.fail_on_already_running = fail_on_already_running
self.config = config
def execute(self, context: Context) -> str:
@@ -90,12 +100,36 @@ class
GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
"""
crawler_name = self.config["Name"]
if self.hook.has_crawler(crawler_name):
- self.hook.update_crawler(**self.config)
+ try:
+ self.hook.update_crawler(**self.config)
+ except ClientError as e:
+ if (
+ not self.fail_on_already_running
+ and e.response["Error"]["Code"] ==
"CrawlerRunningException"
+ ):
+ self.log.warning(
+ "Crawler '%s' is currently running. "
+ "Skipping update and waiting for the existing run to
complete.",
+ crawler_name,
+ )
+ else:
+ raise
else:
self.hook.create_crawler(**self.config)
self.log.info("Triggering AWS Glue Crawler")
- self.hook.start_crawler(crawler_name)
+ try:
+ self.hook.start_crawler(crawler_name)
+ except ClientError as e:
+ if not self.fail_on_already_running and
e.response["Error"]["Code"] == "CrawlerRunningException":
+ self.log.warning(
+ "Crawler '%s' is already running. "
+ "Waiting for the existing run to complete instead of
failing.",
+ crawler_name,
+ )
+ else:
+ raise
+
if self.deferrable:
self.defer(
trigger=GlueCrawlerCompleteTrigger(
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
index ffe13481d47..f56c4d30b77 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING
from unittest import mock
import pytest
+from botocore.exceptions import ClientError
from moto import mock_aws
from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
@@ -179,3 +180,113 @@ class TestGlueCrawlerOperator:
def test_template_fields(self):
validate_template_fields(self.op)
+
+ @mock.patch.object(GlueCrawlerHook, "wait_for_crawler_completion")
+ @mock.patch.object(GlueCrawlerHook, "start_crawler")
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def test_execute_crawler_running_on_start(self, mock_has_crawler,
mock_update, mock_start, mock_wait):
+ """CrawlerRunningException on start_crawler should be caught when
fail_on_already_running=False."""
+ mock_has_crawler.return_value = True
+ mock_start.side_effect = ClientError(
+ error_response={"Error": {"Code": "CrawlerRunningException",
"Message": "Already running"}},
+ operation_name="StartCrawler",
+ )
+ self.op.fail_on_already_running = False
+
+ crawler_name = self.op.execute({})
+
+ assert crawler_name == mock_crawler_name
+ mock_update.assert_called_once()
+ mock_start.assert_called_once_with(mock_crawler_name)
+ mock_wait.assert_called_once_with(crawler_name=mock_crawler_name,
poll_interval=5)
+
+ @mock.patch.object(GlueCrawlerHook, "start_crawler")
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def test_execute_crawler_running_on_update(self, mock_has_crawler,
mock_update, mock_start):
+ """CrawlerRunningException on update_crawler should be caught when
fail_on_already_running=False."""
+ mock_has_crawler.return_value = True
+ mock_update.side_effect = ClientError(
+ error_response={"Error": {"Code": "CrawlerRunningException",
"Message": "Already running"}},
+ operation_name="UpdateCrawler",
+ )
+ self.op.fail_on_already_running = False
+ self.op.wait_for_completion = False
+
+ crawler_name = self.op.execute({})
+
+ assert crawler_name == mock_crawler_name
+ mock_update.assert_called_once()
+ mock_start.assert_called_once_with(mock_crawler_name)
+
+ @mock.patch.object(GlueCrawlerHook, "start_crawler")
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def test_execute_other_client_error_on_start_raises(self,
mock_has_crawler, mock_update, mock_start):
+ """Non-CrawlerRunningException ClientError on start_crawler should
propagate."""
+ mock_has_crawler.return_value = True
+ mock_start.side_effect = ClientError(
+ error_response={"Error": {"Code": "EntityNotFoundException",
"Message": "Not found"}},
+ operation_name="StartCrawler",
+ )
+ self.op.wait_for_completion = False
+
+ with pytest.raises(ClientError) as exc_info:
+ self.op.execute({})
+
+ assert exc_info.value.response["Error"]["Code"] ==
"EntityNotFoundException"
+
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def test_execute_other_client_error_on_update_raises(self,
mock_has_crawler, mock_update):
+ """Non-CrawlerRunningException ClientError on update_crawler should
propagate."""
+ mock_has_crawler.return_value = True
+ mock_update.side_effect = ClientError(
+ error_response={"Error": {"Code": "InvalidInputException",
"Message": "Bad config"}},
+ operation_name="UpdateCrawler",
+ )
+
+ with pytest.raises(ClientError) as exc_info:
+ self.op.execute({})
+
+ assert exc_info.value.response["Error"]["Code"] ==
"InvalidInputException"
+
+ @mock.patch.object(GlueCrawlerHook, "start_crawler")
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def
test_execute_crawler_running_on_start_raises_when_fail_on_already_running(
+ self, mock_has_crawler, mock_update, mock_start
+ ):
+ """CrawlerRunningException on start_crawler re-raises by default
(fail_on_already_running=True)."""
+ mock_has_crawler.return_value = True
+ mock_start.side_effect = ClientError(
+ error_response={"Error": {"Code": "CrawlerRunningException",
"Message": "Already running"}},
+ operation_name="StartCrawler",
+ )
+ self.op.wait_for_completion = False
+
+ with pytest.raises(ClientError) as exc_info:
+ self.op.execute({})
+
+ assert exc_info.value.response["Error"]["Code"] ==
"CrawlerRunningException"
+
+ @mock.patch.object(GlueCrawlerHook, "start_crawler")
+ @mock.patch.object(GlueCrawlerHook, "update_crawler")
+ @mock.patch.object(GlueCrawlerHook, "has_crawler")
+ def
test_execute_crawler_running_on_update_raises_when_fail_on_already_running(
+ self, mock_has_crawler, mock_update, mock_start
+ ):
+ """CrawlerRunningException on update_crawler re-raises by default
(fail_on_already_running=True)."""
+ mock_has_crawler.return_value = True
+ mock_update.side_effect = ClientError(
+ error_response={"Error": {"Code": "CrawlerRunningException",
"Message": "Already running"}},
+ operation_name="UpdateCrawler",
+ )
+ self.op.fail_on_already_running = True
+
+ with pytest.raises(ClientError) as exc_info:
+ self.op.execute({})
+
+ assert exc_info.value.response["Error"]["Code"] ==
"CrawlerRunningException"
+ mock_start.assert_not_called()