pankajkoti commented on PR #31920:
URL: https://github.com/apache/airflow/pull/31920#issuecomment-1623629476

   hi,
   
   I am trying to use Elasticsearch remote logging with Airflow Breeze locally.
   
   The following configuration used to work for me previously 
(apache-airflow-providers-elasticsearch==4.5.1) to fetch and render remote logs 
in the webserver UI task logs using the below setting in `airflow.cfg`
   
   ```
   [logging]
   remote_logging = True
   
   [elasticsearch]
   json_format = True
   host = host.docker.internal:9200
   host_field = host.name
   offset_field = log.offset
   ```
   
   
   However, on main (I think about to be released provider with this change), I 
am getting the below error when the webserver tries to fetch the logs from ES,
   ```
   [2023-07-06T12:48:45.441+0000] {base.py:293} WARNING - POST 
http://host.docker.internal:9200/_all/_count [status:400 request:0.005s]
   [2023-07-06T12:48:45.441+0000] {es_task_handler.py:329} ERROR - Could not 
get current log size with log_id: 
example_xcom-push-scheduled__2023-07-04T00:00:00+00:00--1-1
   Traceback (most recent call last):
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
324, in es_read
       max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 
168, in _wrapped
       return func(*args, params=params, headers=headers, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 
549, in count
       return self.transport.perform_request(
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 458, in perform_request
       raise e
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 419, in perform_request
       status, headers_response, data = connection.perform_request(
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
 line 277, in perform_request
       self._raise_error(response.status, raw_data)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 
330, in _raise_error
       raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
   elasticsearch.exceptions.RequestError: RequestError(400, 
'parsing_exception', 'request does not support [sort]')
   [2023-07-06T12:48:45.446+0000] {app.py:1744} ERROR - Exception on 
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1
 [GET]
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in 
wsgi_app
       response = self.full_dispatch_request()
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in 
full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in 
full_dispatch_request
       rv = self.dispatch_request()
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in 
dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/decorator.py", 
line 68, in wrapper
       response = function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/uri_parsing.py", 
line 149, in wrapper
       response = function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/validation.py", 
line 399, in wrapper
       return function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 
113, in wrapper
       return _wrapper(request, response)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 
90, in _wrapper
       self.operation.api.get_connexion_response(response, self.mimetype)
     File "/usr/local/lib/python3.8/site-packages/connexion/apis/abstract.py", 
line 366, in get_connexion_response
       return cls._framework_to_connexion_response(response=response, 
mimetype=mimetype)
     File "/usr/local/lib/python3.8/site-packages/connexion/apis/flask_api.py", 
line 165, in _framework_to_connexion_response
       body=response.get_data() if not response.direct_passthrough else None,
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
314, in get_data
       self._ensure_sequence()
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
376, in _ensure_sequence
       self.make_sequence()
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
391, in make_sequence
       self.response = list(self.iter_encoded())
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
50, in _iter_encoded
       for item in iterable:
     File "/opt/airflow/airflow/utils/log/log_reader.py", line 85, in 
read_log_stream
       logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
     File "/opt/airflow/airflow/utils/log/log_reader.py", line 62, in 
read_log_chunks
       logs, metadatas = self.log_handler.read(ti, try_number, 
metadata=metadata)
     File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 412, in 
read
       log, out_metadata = self._read(task_instance, try_number_element, 
metadata)
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
238, in _read
       logs = self.es_read(log_id, offset, metadata)
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
330, in es_read
       raise e
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
324, in es_read
       max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 
168, in _wrapped
       return func(*args, params=params, headers=headers, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 
549, in count
       return self.transport.perform_request(
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 458, in perform_request
       raise e
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 419, in perform_request
       status, headers_response, data = connection.perform_request(
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
 line 277, in perform_request
       self._raise_error(response.status, raw_data)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 
330, in _raise_error
       raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
   elasticsearch.exceptions.RequestError: RequestError(400, 
'parsing_exception', 'request does not support [sort]')
   172.20.0.1 - - [06/Jul/2023:12:48:45 +0000] "GET 
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1?full_content=false
 HTTP/1.1" 500 1543 
"http://localhost:28080/dags/example_xcom/grid?root=&tab=logs&dag_run_id=scheduled__2023-07-04T00%3A00%3A00%2B00%3A00&task_id=push";
 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, 
like Gecko) Chrome/114.0.0.0 Safari/537.36"
   [2023-07-06T12:48:46.456+0000] {base.py:270} INFO - GET 
http://host.docker.internal:9200/ [status:200 request:0.013s]
   [2023-07-06T12:48:46.458+0000] {base.py:293} WARNING - POST 
http://host.docker.internal:9200/_all/_count [status:400 request:0.002s]
   [2023-07-06T12:48:46.459+0000] {es_task_handler.py:329} ERROR - Could not 
get current log size with log_id: 
example_xcom-push-scheduled__2023-07-04T00:00:00+00:00--1-1
   Traceback (most recent call last):
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
324, in es_read
       max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 
168, in _wrapped
       return func(*args, params=params, headers=headers, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 
549, in count
       return self.transport.perform_request(
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 458, in perform_request
       raise e
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 419, in perform_request
       status, headers_response, data = connection.perform_request(
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
 line 277, in perform_request
       self._raise_error(response.status, raw_data)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 
330, in _raise_error
       raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
   elasticsearch.exceptions.RequestError: RequestError(400, 
'parsing_exception', 'request does not support [sort]')
   [2023-07-06T12:48:46.464+0000] {app.py:1744} ERROR - Exception on 
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1
 [GET]
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in 
wsgi_app
       response = self.full_dispatch_request()
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in 
full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in 
full_dispatch_request
       rv = self.dispatch_request()
     File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in 
dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/decorator.py", 
line 68, in wrapper
       response = function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/uri_parsing.py", 
line 149, in wrapper
       response = function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/validation.py", 
line 399, in wrapper
       return function(request)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 
113, in wrapper
       return _wrapper(request, response)
     File 
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 
90, in _wrapper
       self.operation.api.get_connexion_response(response, self.mimetype)
     File "/usr/local/lib/python3.8/site-packages/connexion/apis/abstract.py", 
line 366, in get_connexion_response
       return cls._framework_to_connexion_response(response=response, 
mimetype=mimetype)
     File "/usr/local/lib/python3.8/site-packages/connexion/apis/flask_api.py", 
line 165, in _framework_to_connexion_response
       body=response.get_data() if not response.direct_passthrough else None,
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
314, in get_data
       self._ensure_sequence()
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
376, in _ensure_sequence
       self.make_sequence()
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
391, in make_sequence
       self.response = list(self.iter_encoded())
     File 
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 
50, in _iter_encoded
       for item in iterable:
     File "/opt/airflow/airflow/utils/log/log_reader.py", line 85, in 
read_log_stream
       logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
     File "/opt/airflow/airflow/utils/log/log_reader.py", line 62, in 
read_log_chunks
       logs, metadatas = self.log_handler.read(ti, try_number, 
metadata=metadata)
     File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 412, in 
read
       log, out_metadata = self._read(task_instance, try_number_element, 
metadata)
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
238, in _read
       logs = self.es_read(log_id, offset, metadata)
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
330, in es_read
       raise e
     File 
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line 
324, in es_read
       max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 
168, in _wrapped
       return func(*args, params=params, headers=headers, **kwargs)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 
549, in count
       return self.transport.perform_request(
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 458, in perform_request
       raise e
     File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", 
line 419, in perform_request
       status, headers_response, data = connection.perform_request(
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
 line 277, in perform_request
       self._raise_error(response.status, raw_data)
     File 
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 
330, in _raise_error
       raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
   elasticsearch.exceptions.RequestError: RequestError(400, 
'parsing_exception', 'request does not support [sort]')
   ```
   
   Do I need to change my `airflow.cfg` to something else or would I be needing 
some extra steps to make the ES remote logging to work with the changes here?
   
   I am also wondering why it is making a POST 
http://host.docker.internal:9200/_all/_count and giving 400 as response. Should 
it not be a GET request? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to