jedcunningham commented on code in PR #38423:
URL: https://github.com/apache/airflow/pull/38423#discussion_r1551795357
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -372,12 +397,15 @@ def _format_msg(self, hit: Hit):
# Just a safe-guard to preserve backwards-compatibility
return hit.message
- def _es_read(self, log_id: str, offset: int | str) ->
ElasticSearchResponse | None:
+ def _es_read(
+ self, log_id: str, offset: int | str, ti: TaskInstance | None = None
Review Comment:
Since this is private, lets not let TI be optional.
```suggestion
self, log_id: str, offset: int | str, ti: TaskInstance | None
```
Can TI even be none?
##########
airflow/providers/elasticsearch/provider.yaml:
##########
@@ -158,10 +158,19 @@ config:
index_patterns:
description: |
Comma separated list of index patterns to use when searching for
logs (default: `_all`).
+ The index_patterns_callable takes precedence over this.
version_added: 2.6.0
type: string
example: something-*
default: "_all"
+ index_patterns_callable:
+ description: |
+ A string representing the full path to the Python callable path
which accept TI object and
+ return comma separated list of index patterns. This will takes
precedence over index_patterns.
+ version_added: 2.9.0
Review Comment:
Not core version. ES provider version. Probably 5.4?
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -388,16 +416,17 @@ def _es_read(self, log_id: str, offset: int | str) ->
ElasticSearchResponse | No
}
}
+ index_patterns = _get_index_patterns(self.index_patterns_callable, ti)
or self.index_patterns
Review Comment:
It might be a little nicer to have _get_index_patterns be a method instead
so the fallback to index_patterns can happen there. Keeps the logic of callable
vs original config in 1 spot.
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -114,6 +115,26 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance,
session) -> TaskInstance:
raise AirflowException(f"Could not find TaskInstance for {ti}")
+def _get_index_patterns(
+ index_patterns_callable: str | None = None, ti: TaskInstance | None = None
Review Comment:
```suggestion
index_patterns_callable: str | None, ti: TaskInstance
```
I don't think we need to make these optional args.
##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -114,6 +115,26 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance,
session) -> TaskInstance:
raise AirflowException(f"Could not find TaskInstance for {ti}")
+def _get_index_patterns(
+ index_patterns_callable: str | None = None, ti: TaskInstance | None = None
+) -> str | None:
+ """
+ Get index patterns by calling index_patterns_callable or None.
+
+ :param index_patterns_callable: A string representing the full path to the
callable itself.
+ :param ti: A TaskInstance object or None.
+ """
+ if not index_patterns_callable:
+ return None
+ try:
+ module_path, index_pattern_function =
index_patterns_callable.rsplit(".", 1)
+ module = importlib.import_module(module_path)
+ index_pattern_callable_obj = getattr(module, index_pattern_function)
+ return index_pattern_callable_obj(ti)
+ except Exception:
+ return None
Review Comment:
Happy to return None here, but I don't think we should swallow the
exception. If something does fail, it'll be very hard to determine why the
expected index_patterns from the callable wasn't used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]