This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 37a7b27242 `LookerStartPdtBuildOperator`, `LookerCheckPdtBuildSensor`
: fix empty materialization id handling (#23025)
37a7b27242 is described below
commit 37a7b27242fa06e0c805cbc01cf3cfe3557daf8e
Author: Aleksei Loginov <[email protected]>
AuthorDate: Tue Apr 26 03:01:42 2022 -0400
`LookerStartPdtBuildOperator`, `LookerCheckPdtBuildSensor` : fix empty
materialization id handling (#23025)
* fix empty materialization id handling
---
airflow/providers/google/cloud/operators/looker.py | 2 +-
airflow/providers/google/cloud/sensors/looker.py | 3 +++
.../google/cloud/operators/test_looker.py | 23 ++++++++++++++++++++++
.../providers/google/cloud/sensors/test_looker.py | 12 +++++++++++
4 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/operators/looker.py
b/airflow/providers/google/cloud/operators/looker.py
index 00ff022978..917884ab66 100644
--- a/airflow/providers/google/cloud/operators/looker.py
+++ b/airflow/providers/google/cloud/operators/looker.py
@@ -84,7 +84,7 @@ class LookerStartPdtBuildOperator(BaseOperator):
self.materialization_id = resp.materialization_id
- if self.materialization_id is None:
+ if not self.materialization_id:
raise AirflowException(
f'No `materialization_id` was returned for model:
{self.model}, view: {self.view}.'
)
diff --git a/airflow/providers/google/cloud/sensors/looker.py
b/airflow/providers/google/cloud/sensors/looker.py
index d3ef1724b8..15844baa01 100644
--- a/airflow/providers/google/cloud/sensors/looker.py
+++ b/airflow/providers/google/cloud/sensors/looker.py
@@ -53,6 +53,9 @@ class LookerCheckPdtBuildSensor(BaseSensorOperator):
self.hook = LookerHook(looker_conn_id=self.looker_conn_id)
+ if not self.materialization_id:
+ raise AirflowException('Invalid `materialization_id`.')
+
# materialization_id is templated var pulling output from start task
status_dict =
self.hook.pdt_build_status(materialization_id=self.materialization_id)
status = status_dict['status']
diff --git a/tests/providers/google/cloud/operators/test_looker.py
b/tests/providers/google/cloud/operators/test_looker.py
index 58154ca865..cd6e9bd2ce 100644
--- a/tests/providers/google/cloud/operators/test_looker.py
+++ b/tests/providers/google/cloud/operators/test_looker.py
@@ -19,6 +19,9 @@ import unittest
from unittest import mock
from unittest.mock import MagicMock
+import pytest
+
+from airflow.exceptions import AirflowException
from airflow.models import DAG, DagBag
from airflow.providers.google.cloud.operators.looker import
LookerStartPdtBuildOperator
from airflow.utils.timezone import datetime
@@ -146,3 +149,23 @@ class TestLookerStartPdtBuildOperator(LookerTestBase):
task.cancel_on_kill = True
task.on_kill()
mock_hook.return_value.stop_pdt_build.assert_called_once_with(materialization_id=TEST_JOB_ID)
+
+ @mock.patch(OPERATOR_PATH.format("LookerHook"))
+ def test_materialization_id_returned_as_empty_str(self, mock_hook):
+ # mock return vals from hook
+ mock_hook.return_value.start_pdt_build.return_value.materialization_id
= ""
+ mock_hook.return_value.wait_for_job.return_value = None
+
+ # run task in mock context (asynchronous=False)
+ task = LookerStartPdtBuildOperator(
+ task_id=TASK_ID,
+ looker_conn_id=LOOKER_CONN_ID,
+ model=MODEL,
+ view=VIEW,
+ )
+
+ # check AirflowException is raised
+ with pytest.raises(
+ AirflowException, match=f'No `materialization_id` was returned for
model: {MODEL}, view: {VIEW}.'
+ ):
+ task.execute(context=self.mock_context)
diff --git a/tests/providers/google/cloud/sensors/test_looker.py
b/tests/providers/google/cloud/sensors/test_looker.py
index 4c11fd8a48..88d9d6fc9b 100644
--- a/tests/providers/google/cloud/sensors/test_looker.py
+++ b/tests/providers/google/cloud/sensors/test_looker.py
@@ -105,3 +105,15 @@ class TestLookerCheckPdtBuildSensor(unittest.TestCase):
# assert hook.pdt_build_status called once
mock_hook.return_value.pdt_build_status.assert_called_once_with(materialization_id=TEST_JOB_ID)
+
+ def test_empty_materialization_id(self):
+
+ # run task in mock context
+ sensor = LookerCheckPdtBuildSensor(
+ task_id=TASK_ID,
+ looker_conn_id=LOOKER_CONN_ID,
+ materialization_id="",
+ )
+
+ with pytest.raises(AirflowException, match="^Invalid
`materialization_id`.$"):
+ sensor.poke(context={})