kentdanas opened a new issue #13354:
URL: https://github.com/apache/airflow/issues/13354


   **Apache Airflow version**: 2.0
   
   **What happened**:
   
   The 
[AzureDataExplorerQueryOperator](https://github.com/apache/airflow/blob/master/airflow/providers/microsoft/azure/operators/adx.py)
 fails because it attempts to return the results of the query but the results 
can't be pushed to xcom.
   
   ```
   2020-12-28 23:10:08,458] {xcom.py:238} ERROR - Could not serialize the XCom 
value into JSON. If you are using pickles instead of JSON for XCom, then you 
need to enable pickle support for XCom in your airflow config.
   [2020-12-28 23:10:08,459] {taskinstance.py:1454} ERROR - Object of type 
KustoResultTable is not JSON serializable
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1284, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1317, in _execute_task
       self.xcom_push(key=XCOM_RETURN_KEY, value=result)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1898, in xcom_push
       session=session,
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/xcom.py", line 
88, in set
       value = XCom.serialize_value(value)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/xcom.py", line 
235, in serialize_value
       return json.dumps(value).encode('UTF-8')
     File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type KustoResultTable is not JSON serializable
   [2020-12-28 23:10:08,461] {taskinstance.py:1502} INFO - Marking task as 
UP_FOR_RETRY. dag_id=azure_data_explorer, task_id=adx_query, 
execution_date=20201228T231006, start_date=20201228T231007, 
end_date=20201228T231008
   [2020-12-28 23:10:08,503] {local_task_job.py:142} INFO - Task exited with 
return code 1
   ```
   I get the same error no matter what query I run. Execution of the query 
works fine, but it appears to fail on the final return; from the operator code: 
```return response.primary_results[0]```
   
   Not sure if there's a way to change the query output format by passing in 
`options` to the operator, but if there is I couldn't find anything. The 
operator also doesn't have an `xcom_push` param, so I couldn't find a way to 
turn off this functionality.
   
   **What you expected to happen**:
   
   Operator would push results of query to xcom and task would complete 
successfully. 
   
   **How to reproduce it**:
   Call the operator with basic input params as shown in the docs, something 
like this:
   
   ```
   from airflow.providers.microsoft.azure.operators.adx import 
AzureDataExplorerQueryOperator
   
   adx_query = '''StormEvents
   | sort by StartTime desc
   | take 10'''
   
   opr_adx_query = AzureDataExplorerQueryOperator(
           task_id='adx_query',
           query=adx_query,
           database='storm_demo',
           azure_data_explorer_conn_id='adx'
       )
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to