This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5df99d6 New generic tableau operator: TableauOperator (#16915)
5df99d6 is described below
commit 5df99d6c690fbdd728c9fd9482ec9a7479dfd3c2
Author: Mario Taddeucci <[email protected]>
AuthorDate: Mon Aug 9 05:01:48 2021 -0300
New generic tableau operator: TableauOperator (#16915)
---
.../tableau/example_dags/example_tableau.py | 74 +++++++++
airflow/providers/tableau/operators/tableau.py | 144 ++++++++++++++++++
.../tableau/operators/tableau_refresh_workbook.py | 54 +++----
airflow/providers/tableau/provider.yaml | 3 +
docs/apache-airflow-providers-tableau/index.rst | 1 +
.../apache-airflow-providers-tableau/operators.rst | 74 +++++++++
tests/providers/tableau/operators/test_tableau.py | 165 +++++++++++++++++++++
.../operators/test_tableau_refresh_workbook.py | 6 +-
8 files changed, 486 insertions(+), 35 deletions(-)
diff --git a/airflow/providers/tableau/example_dags/example_tableau.py
b/airflow/providers/tableau/example_dags/example_tableau.py
new file mode 100644
index 0000000..04d87d2
--- /dev/null
+++ b/airflow/providers/tableau/example_dags/example_tableau.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag that performs two refresh operations on a Tableau
Workbook aka Extract. The first one
+waits until it succeeds. The second does not wait since this is an
asynchronous operation and we don't know
+when the operation actually finishes. That's why we have another task that
checks only that.
+"""
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.tableau.operators.tableau import TableauOperator
+from airflow.providers.tableau.sensors.tableau_job_status import
TableauJobStatusSensor
+from airflow.utils.dates import days_ago
+
+DEFAULT_ARGS = {
+ 'owner': 'airflow',
+ 'depends_on_past': False,
+ 'email': ['[email protected]'],
+ 'email_on_failure': False,
+ 'email_on_retry': False,
+}
+
+with DAG(
+ dag_id='example_tableau',
+ default_args=DEFAULT_ARGS,
+ dagrun_timeout=timedelta(hours=2),
+ schedule_interval=None,
+ start_date=days_ago(2),
+ tags=['example'],
+) as dag:
+ # Refreshes a workbook and waits until it succeeds.
+ # [START howto_operator_tableau]
+ task_refresh_workbook_blocking = TableauOperator(
+ resource='workbooks',
+ method='refresh',
+ find='MyWorkbook',
+ match_with='name',
+ site_id='my_site',
+ blocking_refresh=True,
+ task_id='refresh_tableau_workbook_blocking',
+ )
+ # [END howto_operator_tableau]
+ # Refreshes a workbook and does not wait until it succeeds.
+ task_refresh_workbook_non_blocking = TableauOperator(
+ resource='workbooks',
+ method='refresh',
+ find='MyWorkbook',
+ match_with='name',
+ site_id='my_site',
+ blocking_refresh=False,
+ task_id='refresh_tableau_workbook_non_blocking',
+ )
+ # The following task queries the status of the workbook refresh job until
it succeeds.
+ task_check_job_status = TableauJobStatusSensor(
+ site_id='my_site',
+ job_id="{{
ti.xcom_pull(task_ids='refresh_tableau_workbook_non_blocking') }}",
+ task_id='check_tableau_job_status',
+ )
+ task_refresh_workbook_non_blocking >> task_check_job_status
diff --git a/airflow/providers/tableau/operators/tableau.py
b/airflow/providers/tableau/operators/tableau.py
new file mode 100644
index 0000000..fc93e7a
--- /dev/null
+++ b/airflow/providers/tableau/operators/tableau.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.tableau.hooks.tableau import (
+ TableauHook,
+ TableauJobFailedException,
+ TableauJobFinishCode,
+)
+
+RESOURCES_METHODS = {
+ 'datasources': ['delete', 'refresh'],
+ 'groups': ['delete'],
+ 'projects': ['delete'],
+ 'schedule': ['delete'],
+ 'sites': ['delete'],
+ 'subscriptions': ['delete'],
+ 'tasks': ['delete', 'run'],
+ 'users': ['remove'],
+ 'workbooks': ['delete', 'refresh'],
+}
+
+
+class TableauOperator(BaseOperator):
+ """
+ Execute a Tableau API Resource
+ https://tableau.github.io/server-client-python/docs/api-ref
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:TableauOperator`
+
+ :param resource: The name of the resource to use.
+ :type resource: str
+ :param method: The name of the resource's method to execute.
+ :type method: str
+ :param find: The reference of resource that will receive the action.
+ :type find: str
+ :param match_with: The resource field name to be matched with find
parameter.
+ :type match_with: Optional[str]
+ :param site_id: The id of the site where the workbook belongs to.
+ :type site_id: Optional[str]
+ :param blocking_refresh: By default will be blocking means it will wait
until it has finished.
+ :type blocking_refresh: bool
+ :param check_interval: time in seconds that the job should wait in
+ between each instance state checks until operation is completed
+ :type check_interval: float
+ :param tableau_conn_id: The :ref:`Tableau Connection id
<howto/connection:tableau>`
+ containing the credentials to authenticate to the Tableau Server.
+ :type tableau_conn_id: str
+ """
+
+ def __init__(
+ self,
+ *,
+ resource: str,
+ method: str,
+ find: str,
+ match_with: str = 'id',
+ site_id: Optional[str] = None,
+ blocking_refresh: bool = True,
+ check_interval: float = 20,
+ tableau_conn_id: str = 'tableau_default',
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.resource = resource
+ self.method = method
+ self.find = find
+ self.match_with = match_with
+ self.check_interval = check_interval
+ self.site_id = site_id
+ self.blocking_refresh = blocking_refresh
+ self.tableau_conn_id = tableau_conn_id
+
+ def execute(self, context: dict) -> str:
+ """
+ Executes the Tableau API resource and pushes the job id or downloaded
file URI to xcom.
+ :param context: The task context during execution.
+ :type context: dict
+ :return: the id of the job that executes the extract refresh or
downloaded file URI.
+ :rtype: str
+ """
+ available_resources = RESOURCES_METHODS.keys()
+ if self.resource not in available_resources:
+ error_message = f'Resource not found! Available Resources:
{available_resources}'
+ raise AirflowException(error_message)
+
+ available_methods = RESOURCES_METHODS[self.resource]
+ if self.method not in available_methods:
+ error_message = f'Method not found! Available methods for
{self.resource}: {available_methods}'
+ raise AirflowException(error_message)
+
+ with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
+
+ resource = getattr(tableau_hook.server, self.resource)
+ method = getattr(resource, self.method)
+
+ resource_id = self._get_resource_id(tableau_hook)
+
+ response = method(resource_id)
+
+ if self.method == 'refresh':
+
+ job_id = response.id
+
+ if self.blocking_refresh:
+ if not tableau_hook.wait_for_state(
+ job_id=job_id,
+ check_interval=self.check_interval,
+ target_state=TableauJobFinishCode.SUCCESS,
+ ):
+ raise TableauJobFailedException(f'The Tableau Refresh
{self.resource} Job failed!')
+
+ return job_id
+
+ def _get_resource_id(self, tableau_hook: TableauHook) -> str:
+
+ if self.match_with == 'id':
+ return self.find
+
+ for resource in tableau_hook.get_all(resource_name=self.resource):
+ if getattr(resource, self.match_with) == self.find:
+ resource_id = resource.id
+ self.log.info('Found matching with id %s', resource_id)
+ return resource_id
+
+ raise AirflowException(f'{self.resource} with {self.match_with}
{self.find} not found!')
diff --git a/airflow/providers/tableau/operators/tableau_refresh_workbook.py
b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
index 6e09367..cecef1a 100644
--- a/airflow/providers/tableau/operators/tableau_refresh_workbook.py
+++ b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
@@ -14,21 +14,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import warnings
from typing import Optional
-from tableauserverclient import WorkbookItem
-
-from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
-from airflow.providers.tableau.hooks.tableau import (
- TableauHook,
- TableauJobFailedException,
- TableauJobFinishCode,
+from airflow.providers.tableau.operators.tableau import TableauOperator
+
+warnings.warn(
+ """This operator is deprecated. Please use
`airflow.providers.tableau.operators.tableau`.""",
+ DeprecationWarning,
+ stacklevel=2,
)
class TableauRefreshWorkbookOperator(BaseOperator):
"""
+ This operator is deprecated. Please use
`airflow.providers.tableau.operators.tableau`.
+
Refreshes a Tableau Workbook/Extract
.. seealso::
https://tableau.github.io/server-client-python/docs/api-ref#workbooks
@@ -75,29 +77,17 @@ class TableauRefreshWorkbookOperator(BaseOperator):
:return: the id of the job that executes the extract refresh
:rtype: str
"""
- with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
- workbook = self._get_workbook_by_name(tableau_hook)
-
- job_id = self._refresh_workbook(tableau_hook, workbook.id)
- if self.blocking:
- if not tableau_hook.wait_for_state(
- job_id=job_id,
- check_interval=self.check_interval,
- target_state=TableauJobFinishCode.SUCCESS,
- ):
- raise TableauJobFailedException('The Tableau Refresh
Workbook Job failed!')
-
- self.log.info('Workbook %s has been successfully refreshed.',
self.workbook_name)
- return job_id
-
- def _get_workbook_by_name(self, tableau_hook: TableauHook) -> WorkbookItem:
- for workbook in tableau_hook.get_all(resource_name='workbooks'):
- if workbook.name == self.workbook_name:
- self.log.info('Found matching workbook with id %s',
workbook.id)
- return workbook
- raise AirflowException(f'Workbook {self.workbook_name} not found!')
+ job_id = TableauOperator(
+ resource='workbooks',
+ method='refresh',
+ find=self.workbook_name,
+ match_with='name',
+ site_id=self.site_id,
+ tableau_conn_id=self.tableau_conn_id,
+ blocking_refresh=self.blocking,
+ check_interval=self.check_interval,
+ task_id='refresh_workbook',
+ dag=None,
+ ).execute(context={})
- def _refresh_workbook(self, tableau_hook: TableauHook, workbook_id: str)
-> str:
- job = tableau_hook.server.workbooks.refresh(workbook_id)
- self.log.info('Refreshing Workbook %s...', self.workbook_name)
- return job.id
+ return job_id
diff --git a/airflow/providers/tableau/provider.yaml
b/airflow/providers/tableau/provider.yaml
index 5adb183..f773a0d 100644
--- a/airflow/providers/tableau/provider.yaml
+++ b/airflow/providers/tableau/provider.yaml
@@ -32,12 +32,15 @@ additional-dependencies:
integrations:
- integration-name: Tableau
external-doc-url: https://www.tableau.com/
+ how-to-guide:
+ - /docs/apache-airflow-providers-tableau/operators.rst
logo: /integration-logos/tableau/tableau.png
tags: [service]
operators:
- integration-name: Tableau
python-modules:
+ - airflow.providers.tableau.operators.tableau
- airflow.providers.tableau.operators.tableau_refresh_workbook
sensors:
diff --git a/docs/apache-airflow-providers-tableau/index.rst
b/docs/apache-airflow-providers-tableau/index.rst
index 30c1e05..47d70a2 100644
--- a/docs/apache-airflow-providers-tableau/index.rst
+++ b/docs/apache-airflow-providers-tableau/index.rst
@@ -27,6 +27,7 @@ Content
:caption: References
Connection Types <connections/tableau>
+ Operators <operators>
Python API <_api/airflow/providers/tableau/index>
.. toctree::
diff --git a/docs/apache-airflow-providers-tableau/operators.rst
b/docs/apache-airflow-providers-tableau/operators.rst
new file mode 100644
index 0000000..c7a8184
--- /dev/null
+++ b/docs/apache-airflow-providers-tableau/operators.rst
@@ -0,0 +1,74 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/operator:TableauOperator:
+
+TableauOperator
+===============
+
+Use the :class:`~airflow.providers.tableau.operators.TableauOperator` to
execute
+Tableau server client python commands in a `Tableau
<https://tableau.github.io/server-client-python/docs/api-ref>`__.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+| **resource**: The name of the resource to use. **str**
+| **method**: The name of the resource's method to execute. **str**
+| **find**: The reference of resource that will receive the action. **str**
+| **match_with**: The resource field name to be matched with find parameter.
**str** - Default: **id**
+| **site_id**: The id of the site where the workbook belongs to. **str** -
Default: **None**
+| **blocking_refresh**: By default the extract refresh will be blocking means
it will wait until it has finished. **bool** - Default: **True**
+| **check_interval**: time in seconds that the job should wait in between each
instance state checks until operation is completed. **float** - Default: **20**
+| **tableau_conn_id**: The credentials to authenticate to the Tableau Server.
**str** - Default: **tableau_default**
+|
+|
+
+
+
+.. list-table:: Available methods by resource
+ :widths: 15 15
+ :header-rows: 1
+
+ * - Resource
+ - Methods
+ * - **datasources**
+ - ``delete``, ``refresh``
+ * - **groups**
+ - ``delete``
+ * - **projects**
+ - ``delete``
+ * - **schedule**
+ - ``delete``
+ * - **sites**
+ - ``delete``
+ * - **tasks**
+ - ``delete``, ``run``
+ * - **users**
+ - ``remove``
+ * - **workbooks**
+ - ``delete``, ``refresh``
+
+
+An example usage of the TableauOperator is as follows:
+
+.. exampleinclude::
/../../airflow/providers/tableau/example_dags/example_tableau.py
+ :language: python
+ :start-after: [START howto_operator_tableau]
+ :end-before: [END howto_operator_tableau]
diff --git a/tests/providers/tableau/operators/test_tableau.py
b/tests/providers/tableau/operators/test_tableau.py
new file mode 100644
index 0000000..4d8e20f
--- /dev/null
+++ b/tests/providers/tableau/operators/test_tableau.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest.mock import Mock, patch
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.tableau.hooks.tableau import TableauJobFinishCode
+from airflow.providers.tableau.operators.tableau import TableauOperator
+
+
+class TestTableauOperator(unittest.TestCase):
+ """
+ Test class for TableauOperator
+ """
+
+ def setUp(self):
+ """
+ setup
+ """
+
+ self.mocked_workbooks = []
+ self.mock_datasources = []
+
+ for i in range(3):
+ mock_workbook = Mock()
+ mock_workbook.id = i
+ mock_workbook.name = f'wb_{i}'
+ self.mocked_workbooks.append(mock_workbook)
+
+ mock_datasource = Mock()
+ mock_datasource.id = i
+ mock_datasource.name = f'ds_{i}'
+ self.mock_datasources.append(mock_datasource)
+
+ self.kwargs = {
+ 'site_id': 'test_site',
+ 'task_id': 'task',
+ 'dag': None,
+ 'match_with': 'name',
+ 'method': 'refresh',
+ }
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_workbooks(self, mock_tableau_hook):
+ """
+ Test Execute Workbooks
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(blocking_refresh=False, find='wb_2',
resource='workbooks', **self.kwargs)
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.workbooks.refresh.return_value.id ==
job_id
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_workbooks_blocking(self, mock_tableau_hook):
+ """
+ Test execute workbooks blocking
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ mock_tableau_hook.server.jobs.get_by_id = Mock(
+ return_value=Mock(finish_code=TableauJobFinishCode.SUCCESS.value)
+ )
+
+ operator = TableauOperator(find='wb_2', resource='workbooks',
**self.kwargs)
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.workbooks.refresh.return_value.id ==
job_id
+ mock_tableau_hook.wait_for_state.assert_called_once_with(
+ job_id=job_id, check_interval=20,
target_state=TableauJobFinishCode.SUCCESS
+ )
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_missing_workbook(self, mock_tableau_hook):
+ """
+ Test execute missing workbook
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(find='test', resource='workbooks',
**self.kwargs)
+
+ with pytest.raises(AirflowException):
+ operator.execute({})
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_datasources(self, mock_tableau_hook):
+ """
+ Test Execute datasources
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(blocking_refresh=False, find='ds_2',
resource='datasources', **self.kwargs)
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.datasources.refresh.return_value.id ==
job_id
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_datasources_blocking(self, mock_tableau_hook):
+ """
+ Test execute datasources blocking
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(find='ds_2', resource='datasources',
**self.kwargs)
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.datasources.refresh.return_value.id ==
job_id
+ mock_tableau_hook.wait_for_state.assert_called_once_with(
+ job_id=job_id, check_interval=20,
target_state=TableauJobFinishCode.SUCCESS
+ )
+
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
+ def test_execute_missing_datasource(self, mock_tableau_hook):
+ """
+ Test execute missing datasource
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(find='test', resource='datasources',
**self.kwargs)
+
+ with pytest.raises(AirflowException):
+ operator.execute({})
+
+ def test_execute_unavailable_resource(self):
+ """
+ Test execute unavailable resource
+ """
+ operator = TableauOperator(resource='test', find='test', **self.kwargs)
+
+ with pytest.raises(AirflowException):
+ operator.execute({})
+
+ def test_get_resource_id(self):
+ """
+ Test get resource id
+ """
+ resource_id = 'res_id'
+ operator = TableauOperator(resource='task', find=resource_id,
method='run', task_id='t', dag=None)
+ assert operator._get_resource_id(resource_id) == resource_id
diff --git a/tests/providers/tableau/operators/test_tableau_refresh_workbook.py
b/tests/providers/tableau/operators/test_tableau_refresh_workbook.py
index a5f12a4..51420ce 100644
--- a/tests/providers/tableau/operators/test_tableau_refresh_workbook.py
+++ b/tests/providers/tableau/operators/test_tableau_refresh_workbook.py
@@ -42,7 +42,7 @@ class TestTableauRefreshWorkbookOperator(unittest.TestCase):
self.mocked_workbooks.append(mock_workbook)
self.kwargs = {'site_id': 'test_site', 'task_id': 'task', 'dag': None,
'check_interval': 1}
-
@patch('airflow.providers.tableau.operators.tableau_refresh_workbook.TableauHook')
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
def test_execute(self, mock_tableau_hook):
"""
Test Execute
@@ -56,7 +56,7 @@ class TestTableauRefreshWorkbookOperator(unittest.TestCase):
mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
assert mock_tableau_hook.server.workbooks.refresh.return_value.id ==
job_id
-
@patch('airflow.providers.tableau.operators.tableau_refresh_workbook.TableauHook')
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
def test_execute_blocking(self, mock_tableau_hook):
"""
Test execute blocking
@@ -76,7 +76,7 @@ class TestTableauRefreshWorkbookOperator(unittest.TestCase):
job_id=job_id, check_interval=1,
target_state=TableauJobFinishCode.SUCCESS
)
-
@patch('airflow.providers.tableau.operators.tableau_refresh_workbook.TableauHook')
+ @patch('airflow.providers.tableau.operators.tableau.TableauHook')
def test_execute_missing_workbook(self, mock_tableau_hook):
"""
Test execute missing workbook