ashb commented on a change in pull request #6715: [AIRFLOW-5945] Make inbuilt OperatorLinks work when using Serialization URL: https://github.com/apache/airflow/pull/6715#discussion_r354279512
########## File path: tests/gcp/operators/test_bigquery.py ########## @@ -432,6 +433,93 @@ def test_bigquery_operator_defaults(self, mock_hook): ti.render_templates() self.assertTrue(isinstance(ti.task.sql, str)) + def test_bigquery_operator_extra_serialized_field_when_single_query(self): + with self.dag: + BigQueryOperator( + task_id=TASK_ID, + sql='SELECT * FROM test_table', + ) + # ToDo: Make OperatorLinks working for list of sql queries + serialized_dag = SerializedDAG.to_dict(self.dag) + self.assertIn("sql", serialized_dag["dag"]["tasks"][0]) + + dag = SerializedDAG.from_dict(serialized_dag) + simple_task = dag.task_dict[TASK_ID] + self.assertEqual(getattr(simple_task, "sql"), 'SELECT * FROM test_table') + + ######################################################### + # Verify Operator Links work with Serialized Operator + ######################################################### + + # Check Serialized version of operator link + self.assertEqual( + serialized_dag["dag"]["tasks"][0]["_operator_extra_links"], + [{'airflow.gcp.operators.bigquery.BigQueryConsoleLink': {}}] + ) + + # Check DeSerialized version of operator link + self.assertIsInstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink) + + ti = TaskInstance(task=simple_task, execution_date=DEFAULT_DATE) + ti.xcom_push('job_id', 12345) + + # check for positive case + url = simple_task.get_extra_links(DEFAULT_DATE, BigQueryConsoleLink.name) + self.assertEqual(url, 'https://console.cloud.google.com/bigquery?j=12345') + + # check for negative case + url2 = simple_task.get_extra_links(datetime(2017, 1, 2), BigQueryConsoleLink.name) + self.assertEqual(url2, '') + + def test_bigquery_operator_extra_serialized_field_when_multiple_queries(self): + with self.dag: + BigQueryOperator( + task_id=TASK_ID, + sql=['SELECT * FROM test_table', 'SELECT * FROM test_table2'], + ) + serialized_dag = SerializedDAG.to_dict(self.dag) + self.assertIn("sql", serialized_dag["dag"]["tasks"][0]) + + dag = SerializedDAG.from_dict(serialized_dag) + simple_task = dag.task_dict[TASK_ID] Review comment: ```suggestion deserialized_task = dag.task_dict[TASK_ID] ``` As we've currently got something else called SimpleTask and this isn't one of those. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services