jedcunningham commented on code in PR #38423:
URL: https://github.com/apache/airflow/pull/38423#discussion_r1552007608
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -372,12 +392,15 @@ def _format_msg(self, hit: Hit):
# Just a safe-guard to preserve backwards-compatibility
return hit.message
- def _es_read(self, log_id: str, offset: int | str) ->
ElasticSearchResponse | None:
+ def _es_read(
+ self, log_id: str, offset: int | str, ti: TaskInstance | None
Review Comment:
```suggestion
self, log_id: str, offset: int | str, ti: TaskInstance
```
Don't we always have a ti?
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -153,6 +154,9 @@ def __init__(
host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch",
"index_patterns", fallback="_all"),
Review Comment:
```suggestion
index_patterns: str = conf.get("elasticsearch", "index_patterns"),
```
Might help with typing checks - this can't actually be none.
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -213,6 +218,21 @@ def format_url(host: str) -> str:
return host
+ def _get_index_patterns(self, ti: TaskInstance | None) -> str | None:
+ """
+ Get index patterns by calling index_patterns_callable or None.
+
+ :param ti: A TaskInstance object or None.
+ """
+ if self.index_patterns_callable:
+ self.log.debug("Using %s index_patterns_callable",
self.index_patterns_callable)
+ module_path, index_pattern_function =
self.index_patterns_callable.rsplit(".", 1)
+ module = importlib.import_module(module_path)
+ index_pattern_callable_obj = getattr(module,
index_pattern_function)
+ return index_pattern_callable_obj(ti)
Review Comment:
We are no longer handling exceptions here?
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -213,6 +218,21 @@ def format_url(host: str) -> str:
return host
+ def _get_index_patterns(self, ti: TaskInstance | None) -> str | None:
+ """
+ Get index patterns by calling index_patterns_callable or None.
+
+ :param ti: A TaskInstance object or None.
+ """
+ if self.index_patterns_callable:
+ self.log.debug("Using %s index_patterns_callable",
self.index_patterns_callable)
+ module_path, index_pattern_function =
self.index_patterns_callable.rsplit(".", 1)
+ module = importlib.import_module(module_path)
+ index_pattern_callable_obj = getattr(module,
index_pattern_function)
+ return index_pattern_callable_obj(ti)
+ self.log.debug("Using %s index_patterns", self.index_patterns)
Review Comment:
If we keep this log, we should probably reword it.
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -213,6 +218,21 @@ def format_url(host: str) -> str:
return host
+ def _get_index_patterns(self, ti: TaskInstance | None) -> str | None:
Review Comment:
```suggestion
def _get_index_patterns(self, ti: TaskInstance | None) -> str:
```
Won't this always be a string now?
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -213,6 +218,21 @@ def format_url(host: str) -> str:
return host
+ def _get_index_patterns(self, ti: TaskInstance | None) -> str | None:
+ """
+ Get index patterns by calling index_patterns_callable or None.
Review Comment:
```suggestion
Get index patterns by calling index_patterns_callable, if provided,
or the configured `index_patterns`.
```
Or similar.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]