ashb commented on a change in pull request #6715: [AIRFLOW-5945] Make inbuilt 
OperatorLinks work when using Serialization
URL: https://github.com/apache/airflow/pull/6715#discussion_r354279512
 
 

 ##########
 File path: tests/gcp/operators/test_bigquery.py
 ##########
 @@ -432,6 +433,93 @@ def test_bigquery_operator_defaults(self, mock_hook):
         ti.render_templates()
         self.assertTrue(isinstance(ti.task.sql, str))
 
+    def test_bigquery_operator_extra_serialized_field_when_single_query(self):
+        with self.dag:
+            BigQueryOperator(
+                task_id=TASK_ID,
+                sql='SELECT * FROM test_table',
+            )
+        # ToDo: Make OperatorLinks working for list of sql queries
+        serialized_dag = SerializedDAG.to_dict(self.dag)
+        self.assertIn("sql", serialized_dag["dag"]["tasks"][0])
+
+        dag = SerializedDAG.from_dict(serialized_dag)
+        simple_task = dag.task_dict[TASK_ID]
+        self.assertEqual(getattr(simple_task, "sql"), 'SELECT * FROM 
test_table')
+
+        #########################################################
+        # Verify Operator Links work with Serialized Operator
+        #########################################################
+
+        # Check Serialized version of operator link
+        self.assertEqual(
+            serialized_dag["dag"]["tasks"][0]["_operator_extra_links"],
+            [{'airflow.gcp.operators.bigquery.BigQueryConsoleLink': {}}]
+        )
+
+        # Check DeSerialized version of operator link
+        self.assertIsInstance(list(simple_task.operator_extra_links)[0], 
BigQueryConsoleLink)
+
+        ti = TaskInstance(task=simple_task, execution_date=DEFAULT_DATE)
+        ti.xcom_push('job_id', 12345)
+
+        # check for positive case
+        url = simple_task.get_extra_links(DEFAULT_DATE, 
BigQueryConsoleLink.name)
+        self.assertEqual(url, 
'https://console.cloud.google.com/bigquery?j=12345')
+
+        # check for negative case
+        url2 = simple_task.get_extra_links(datetime(2017, 1, 2), 
BigQueryConsoleLink.name)
+        self.assertEqual(url2, '')
+
+    def 
test_bigquery_operator_extra_serialized_field_when_multiple_queries(self):
+        with self.dag:
+            BigQueryOperator(
+                task_id=TASK_ID,
+                sql=['SELECT * FROM test_table', 'SELECT * FROM test_table2'],
+            )
+        serialized_dag = SerializedDAG.to_dict(self.dag)
+        self.assertIn("sql", serialized_dag["dag"]["tasks"][0])
+
+        dag = SerializedDAG.from_dict(serialized_dag)
+        simple_task = dag.task_dict[TASK_ID]
 
 Review comment:
   ```suggestion
           deserialized_task = dag.task_dict[TASK_ID]
   ```
   
   As we've currently got something else called SimpleTask and this isn't one 
of those.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to