pankajkoti commented on PR #31920:
URL: https://github.com/apache/airflow/pull/31920#issuecomment-1623629476
hi,
I am trying to use Elasticsearch remote logging with Airflow Breeze locally.
The following configuration used to work for me previously
(apache-airflow-providers-elasticsearch==4.5.1) to fetch and render remote logs
in the webserver UI task logs using the below setting in `airflow.cfg`
```
[logging]
remote_logging = True
[elasticsearch]
json_format = True
host = host.docker.internal:9200
host_field = host.name
offset_field = log.offset
```
However, on main (I think about to be released provider with this change), I
am getting the below error when the webserver tries to fetch the logs from ES,
```
[2023-07-06T12:48:45.441+0000] {base.py:293} WARNING - POST
http://host.docker.internal:9200/_all/_count [status:400 request:0.005s]
[2023-07-06T12:48:45.441+0000] {es_task_handler.py:329} ERROR - Could not
get current log size with log_id:
example_xcom-push-scheduled__2023-07-04T00:00:00+00:00--1-1
Traceback (most recent call last):
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
324, in es_read
max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line
168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line
549, in count
return self.transport.perform_request(
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 458, in perform_request
raise e
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 419, in perform_request
status, headers_response, data = connection.perform_request(
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
line 277, in perform_request
self._raise_error(response.status, raw_data)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line
330, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.RequestError: RequestError(400,
'parsing_exception', 'request does not support [sort]')
[2023-07-06T12:48:45.446+0000] {app.py:1744} ERROR - Exception on
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1
[GET]
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in
wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in
full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in
full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in
dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/decorator.py",
line 68, in wrapper
response = function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/uri_parsing.py",
line 149, in wrapper
response = function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/validation.py",
line 399, in wrapper
return function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line
113, in wrapper
return _wrapper(request, response)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line
90, in _wrapper
self.operation.api.get_connexion_response(response, self.mimetype)
File "/usr/local/lib/python3.8/site-packages/connexion/apis/abstract.py",
line 366, in get_connexion_response
return cls._framework_to_connexion_response(response=response,
mimetype=mimetype)
File "/usr/local/lib/python3.8/site-packages/connexion/apis/flask_api.py",
line 165, in _framework_to_connexion_response
body=response.get_data() if not response.direct_passthrough else None,
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
314, in get_data
self._ensure_sequence()
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
376, in _ensure_sequence
self.make_sequence()
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
391, in make_sequence
self.response = list(self.iter_encoded())
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
50, in _iter_encoded
for item in iterable:
File "/opt/airflow/airflow/utils/log/log_reader.py", line 85, in
read_log_stream
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
File "/opt/airflow/airflow/utils/log/log_reader.py", line 62, in
read_log_chunks
logs, metadatas = self.log_handler.read(ti, try_number,
metadata=metadata)
File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 412, in
read
log, out_metadata = self._read(task_instance, try_number_element,
metadata)
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
238, in _read
logs = self.es_read(log_id, offset, metadata)
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
330, in es_read
raise e
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
324, in es_read
max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line
168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line
549, in count
return self.transport.perform_request(
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 458, in perform_request
raise e
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 419, in perform_request
status, headers_response, data = connection.perform_request(
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
line 277, in perform_request
self._raise_error(response.status, raw_data)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line
330, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.RequestError: RequestError(400,
'parsing_exception', 'request does not support [sort]')
172.20.0.1 - - [06/Jul/2023:12:48:45 +0000] "GET
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1?full_content=false
HTTP/1.1" 500 1543
"http://localhost:28080/dags/example_xcom/grid?root=&tab=logs&dag_run_id=scheduled__2023-07-04T00%3A00%3A00%2B00%3A00&task_id=push"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/114.0.0.0 Safari/537.36"
[2023-07-06T12:48:46.456+0000] {base.py:270} INFO - GET
http://host.docker.internal:9200/ [status:200 request:0.013s]
[2023-07-06T12:48:46.458+0000] {base.py:293} WARNING - POST
http://host.docker.internal:9200/_all/_count [status:400 request:0.002s]
[2023-07-06T12:48:46.459+0000] {es_task_handler.py:329} ERROR - Could not
get current log size with log_id:
example_xcom-push-scheduled__2023-07-04T00:00:00+00:00--1-1
Traceback (most recent call last):
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
324, in es_read
max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line
168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line
549, in count
return self.transport.perform_request(
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 458, in perform_request
raise e
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 419, in perform_request
status, headers_response, data = connection.perform_request(
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
line 277, in perform_request
self._raise_error(response.status, raw_data)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line
330, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.RequestError: RequestError(400,
'parsing_exception', 'request does not support [sort]')
[2023-07-06T12:48:46.464+0000] {app.py:1744} ERROR - Exception on
/api/v1/dags/example_xcom/dagRuns/scheduled__2023-07-04T00:00:00+00:00/taskInstances/push/logs/1
[GET]
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in
wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in
full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in
full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in
dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/decorator.py",
line 68, in wrapper
response = function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/uri_parsing.py",
line 149, in wrapper
response = function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/validation.py",
line 399, in wrapper
return function(request)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line
113, in wrapper
return _wrapper(request, response)
File
"/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line
90, in _wrapper
self.operation.api.get_connexion_response(response, self.mimetype)
File "/usr/local/lib/python3.8/site-packages/connexion/apis/abstract.py",
line 366, in get_connexion_response
return cls._framework_to_connexion_response(response=response,
mimetype=mimetype)
File "/usr/local/lib/python3.8/site-packages/connexion/apis/flask_api.py",
line 165, in _framework_to_connexion_response
body=response.get_data() if not response.direct_passthrough else None,
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
314, in get_data
self._ensure_sequence()
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
376, in _ensure_sequence
self.make_sequence()
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
391, in make_sequence
self.response = list(self.iter_encoded())
File
"/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line
50, in _iter_encoded
for item in iterable:
File "/opt/airflow/airflow/utils/log/log_reader.py", line 85, in
read_log_stream
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
File "/opt/airflow/airflow/utils/log/log_reader.py", line 62, in
read_log_chunks
logs, metadatas = self.log_handler.read(ti, try_number,
metadata=metadata)
File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 412, in
read
log, out_metadata = self._read(task_instance, try_number_element,
metadata)
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
238, in _read
logs = self.es_read(log_id, offset, metadata)
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
330, in es_read
raise e
File
"/opt/airflow/airflow/providers/elasticsearch/log/es_task_handler.py", line
324, in es_read
max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line
168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line
549, in count
return self.transport.perform_request(
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 458, in perform_request
raise e
File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py",
line 419, in perform_request
status, headers_response, data = connection.perform_request(
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py",
line 277, in perform_request
self._raise_error(response.status, raw_data)
File
"/usr/local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line
330, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.RequestError: RequestError(400,
'parsing_exception', 'request does not support [sort]')
```
Do I need to change my `airflow.cfg` to something else or would I be needing
some extra steps to make the ES remote logging to work with the changes here?
I am also wondering why it is making a POST
http://host.docker.internal:9200/_all/_count and giving 400 as response. Should
it not be a GET request?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]