pankajkoti commented on code in PR #33135:
URL: https://github.com/apache/airflow/pull/33135#discussion_r1285850145
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -101,8 +101,8 @@ def __init__(
super().__init__(base_log_folder, filename_template)
self.closed = False
- self.client = elasticsearch.Elasticsearch(host.split(";"),
**es_kwargs) # type: ignore[attr-defined]
-
+ self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type:
ignore[attr-defined]
+ # in airflow.cfg, host of elasticsearch has to be
http://dockerhostXxxx:9200
Review Comment:
May I know what error do we see if the protocol is not included in the set
value?
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str,
metadata: dict) -> list | Elas
}
try:
- max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
+ max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
self.index_patterns)
raise e
- except ElasticsearchException as e:
- self.log.exception("Could not get current log size with log_id:
%s", log_id)
- raise e
logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
- res = self.client.search(
+ res = self.client.search( # type: ignore
Review Comment:
same question regarding type ignore
##########
airflow/providers/elasticsearch/CHANGELOG.rst:
##########
@@ -27,6 +27,17 @@
Changelog
---------
+5.1.0
Review Comment:
```suggestion
6.0.0
```
Since we are suggesting a breaking change, we need to have a major bump
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str,
metadata: dict) -> list | Elas
}
try:
- max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
+ max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
self.index_patterns)
raise e
- except ElasticsearchException as e:
- self.log.exception("Could not get current log size with log_id:
%s", log_id)
- raise e
logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
- res = self.client.search(
+ res = self.client.search( # type: ignore
index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
logs = ElasticSearchResponse(self, res)
- except elasticsearch.exceptions.ElasticsearchException:
- self.log.exception("Could not read log with log_id: %s",
log_id)
+ except Exception as err:
Review Comment:
Cannot we not narrow down the exception we catch? Is the previous exception
no longer present? If so, have they added any other similar class and can we
use that?
Having such a broad level exception catch and not re-raising it might lead
to some silent failures.
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str,
metadata: dict) -> list | Elas
}
try:
- max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
+ max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"] # type: ignore
Review Comment:
Why do we have here a type ignore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]