This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e9b05f9f649 Handle CrawlerRunningException gracefully in 
GlueCrawlerOperator (#62016)
e9b05f9f649 is described below

commit e9b05f9f6494ad7d786b8b219b82bfd37b5ba01f
Author: bahram-cdt <[email protected]>
AuthorDate: Tue Feb 17 17:29:51 2026 +0100

    Handle CrawlerRunningException gracefully in GlueCrawlerOperator (#62016)
    
    * Handle CrawlerRunningException in GlueCrawlerOperator
    
    When start_crawler() or update_crawler() is called while the crawler is 
already running (e.g., from a retry, overlapping DAG run, or boto3 internal 
retry after a timeout), the Glue API raises CrawlerRunningException. Previously 
this propagated as an unhandled error, causing Airflow task failure despite the 
crawler actually succeeding.
    
    This change catches CrawlerRunningException on both update_crawler() and 
start_crawler() calls, logs a warning, and waits for the existing run to 
complete instead of failing.
    
    * Update 
providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
    
    Co-authored-by: Vincent <[email protected]>
    
    ---------
    
    Co-authored-by: Vincent <[email protected]>
---
 .../providers/amazon/aws/operators/glue_crawler.py |  38 ++++++-
 .../unit/amazon/aws/operators/test_glue_crawler.py | 111 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 2 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
index 95d154095b5..7c8e0d54d5b 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -20,6 +20,8 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
+from botocore.exceptions import ClientError
+
 from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
 from airflow.providers.amazon.aws.triggers.glue_crawler import 
GlueCrawlerCompleteTrigger
@@ -49,6 +51,12 @@ class GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
     :param deferrable: If True, the operator will wait asynchronously for the 
crawl to complete.
         This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
         (default: False)
+    :param fail_on_already_running: If True (default), the operator will raise 
an exception when
+        ``start_crawler()`` or ``update_crawler()`` encounters a 
``CrawlerRunningException``
+        (i.e., the crawler is already running). If False, the operator logs a 
warning
+        and waits for the existing run to complete. Setting this to False is 
useful for handling
+        retry-induced race conditions where boto3 retries trigger a second 
``start_crawler()``
+        call after a network timeout on the first (successful) call. (default: 
True)
     :param aws_conn_id: The Airflow connection used for AWS credentials.
         If this is ``None`` or empty then the default boto3 behaviour is used. 
If
         running Airflow in a distributed manner and aws_conn_id is None or
@@ -74,12 +82,14 @@ class GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
         poll_interval: int = 5,
         wait_for_completion: bool = True,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        fail_on_already_running: bool = True,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.poll_interval = poll_interval
         self.wait_for_completion = wait_for_completion
         self.deferrable = deferrable
+        self.fail_on_already_running = fail_on_already_running
         self.config = config
 
     def execute(self, context: Context) -> str:
@@ -90,12 +100,36 @@ class 
GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
         """
         crawler_name = self.config["Name"]
         if self.hook.has_crawler(crawler_name):
-            self.hook.update_crawler(**self.config)
+            try:
+                self.hook.update_crawler(**self.config)
+            except ClientError as e:
+                if (
+                    not self.fail_on_already_running
+                    and e.response["Error"]["Code"] == 
"CrawlerRunningException"
+                ):
+                    self.log.warning(
+                        "Crawler '%s' is currently running. "
+                        "Skipping update and waiting for the existing run to 
complete.",
+                        crawler_name,
+                    )
+                else:
+                    raise
         else:
             self.hook.create_crawler(**self.config)
 
         self.log.info("Triggering AWS Glue Crawler")
-        self.hook.start_crawler(crawler_name)
+        try:
+            self.hook.start_crawler(crawler_name)
+        except ClientError as e:
+            if not self.fail_on_already_running and 
e.response["Error"]["Code"] == "CrawlerRunningException":
+                self.log.warning(
+                    "Crawler '%s' is already running. "
+                    "Waiting for the existing run to complete instead of 
failing.",
+                    crawler_name,
+                )
+            else:
+                raise
+
         if self.deferrable:
             self.defer(
                 trigger=GlueCrawlerCompleteTrigger(
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
index ffe13481d47..f56c4d30b77 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue_crawler.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING
 from unittest import mock
 
 import pytest
+from botocore.exceptions import ClientError
 from moto import mock_aws
 
 from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
@@ -179,3 +180,113 @@ class TestGlueCrawlerOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.op)
+
+    @mock.patch.object(GlueCrawlerHook, "wait_for_crawler_completion")
+    @mock.patch.object(GlueCrawlerHook, "start_crawler")
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def test_execute_crawler_running_on_start(self, mock_has_crawler, 
mock_update, mock_start, mock_wait):
+        """CrawlerRunningException on start_crawler should be caught when 
fail_on_already_running=False."""
+        mock_has_crawler.return_value = True
+        mock_start.side_effect = ClientError(
+            error_response={"Error": {"Code": "CrawlerRunningException", 
"Message": "Already running"}},
+            operation_name="StartCrawler",
+        )
+        self.op.fail_on_already_running = False
+
+        crawler_name = self.op.execute({})
+
+        assert crawler_name == mock_crawler_name
+        mock_update.assert_called_once()
+        mock_start.assert_called_once_with(mock_crawler_name)
+        mock_wait.assert_called_once_with(crawler_name=mock_crawler_name, 
poll_interval=5)
+
+    @mock.patch.object(GlueCrawlerHook, "start_crawler")
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def test_execute_crawler_running_on_update(self, mock_has_crawler, 
mock_update, mock_start):
+        """CrawlerRunningException on update_crawler should be caught when 
fail_on_already_running=False."""
+        mock_has_crawler.return_value = True
+        mock_update.side_effect = ClientError(
+            error_response={"Error": {"Code": "CrawlerRunningException", 
"Message": "Already running"}},
+            operation_name="UpdateCrawler",
+        )
+        self.op.fail_on_already_running = False
+        self.op.wait_for_completion = False
+
+        crawler_name = self.op.execute({})
+
+        assert crawler_name == mock_crawler_name
+        mock_update.assert_called_once()
+        mock_start.assert_called_once_with(mock_crawler_name)
+
+    @mock.patch.object(GlueCrawlerHook, "start_crawler")
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def test_execute_other_client_error_on_start_raises(self, 
mock_has_crawler, mock_update, mock_start):
+        """Non-CrawlerRunningException ClientError on start_crawler should 
propagate."""
+        mock_has_crawler.return_value = True
+        mock_start.side_effect = ClientError(
+            error_response={"Error": {"Code": "EntityNotFoundException", 
"Message": "Not found"}},
+            operation_name="StartCrawler",
+        )
+        self.op.wait_for_completion = False
+
+        with pytest.raises(ClientError) as exc_info:
+            self.op.execute({})
+
+        assert exc_info.value.response["Error"]["Code"] == 
"EntityNotFoundException"
+
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def test_execute_other_client_error_on_update_raises(self, 
mock_has_crawler, mock_update):
+        """Non-CrawlerRunningException ClientError on update_crawler should 
propagate."""
+        mock_has_crawler.return_value = True
+        mock_update.side_effect = ClientError(
+            error_response={"Error": {"Code": "InvalidInputException", 
"Message": "Bad config"}},
+            operation_name="UpdateCrawler",
+        )
+
+        with pytest.raises(ClientError) as exc_info:
+            self.op.execute({})
+
+        assert exc_info.value.response["Error"]["Code"] == 
"InvalidInputException"
+
+    @mock.patch.object(GlueCrawlerHook, "start_crawler")
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def 
test_execute_crawler_running_on_start_raises_when_fail_on_already_running(
+        self, mock_has_crawler, mock_update, mock_start
+    ):
+        """CrawlerRunningException on start_crawler re-raises by default 
(fail_on_already_running=True)."""
+        mock_has_crawler.return_value = True
+        mock_start.side_effect = ClientError(
+            error_response={"Error": {"Code": "CrawlerRunningException", 
"Message": "Already running"}},
+            operation_name="StartCrawler",
+        )
+        self.op.wait_for_completion = False
+
+        with pytest.raises(ClientError) as exc_info:
+            self.op.execute({})
+
+        assert exc_info.value.response["Error"]["Code"] == 
"CrawlerRunningException"
+
+    @mock.patch.object(GlueCrawlerHook, "start_crawler")
+    @mock.patch.object(GlueCrawlerHook, "update_crawler")
+    @mock.patch.object(GlueCrawlerHook, "has_crawler")
+    def 
test_execute_crawler_running_on_update_raises_when_fail_on_already_running(
+        self, mock_has_crawler, mock_update, mock_start
+    ):
+        """CrawlerRunningException on update_crawler re-raises by default 
(fail_on_already_running=True)."""
+        mock_has_crawler.return_value = True
+        mock_update.side_effect = ClientError(
+            error_response={"Error": {"Code": "CrawlerRunningException", 
"Message": "Already running"}},
+            operation_name="UpdateCrawler",
+        )
+        self.op.fail_on_already_running = True
+
+        with pytest.raises(ClientError) as exc_info:
+            self.op.execute({})
+
+        assert exc_info.value.response["Error"]["Code"] == 
"CrawlerRunningException"
+        mock_start.assert_not_called()

Reply via email to