jedcunningham commented on code in PR #38423:
URL: https://github.com/apache/airflow/pull/38423#discussion_r1551795357


##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -372,12 +397,15 @@ def _format_msg(self, hit: Hit):
         # Just a safe-guard to preserve backwards-compatibility
         return hit.message
 
-    def _es_read(self, log_id: str, offset: int | str) -> 
ElasticSearchResponse | None:
+    def _es_read(
+        self, log_id: str, offset: int | str, ti: TaskInstance | None = None

Review Comment:
   Since this is private, lets not let TI be optional.
   ```suggestion
           self, log_id: str, offset: int | str, ti: TaskInstance | None
   ```
   
   Can TI even be none?



##########
airflow/providers/elasticsearch/provider.yaml:
##########
@@ -158,10 +158,19 @@ config:
       index_patterns:
         description: |
           Comma separated list of index patterns to use when searching for 
logs (default: `_all`).
+          The index_patterns_callable takes precedence over this.
         version_added: 2.6.0
         type: string
         example: something-*
         default: "_all"
+      index_patterns_callable:
+        description: |
+          A string representing the full path to the Python callable path 
which accept TI object and
+          return comma separated list of index patterns. This will takes 
precedence over index_patterns.
+        version_added: 2.9.0

Review Comment:
   Not core version. ES provider version. Probably 5.4?



##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -388,16 +416,17 @@ def _es_read(self, log_id: str, offset: int | str) -> 
ElasticSearchResponse | No
             }
         }
 
+        index_patterns = _get_index_patterns(self.index_patterns_callable, ti) 
or self.index_patterns

Review Comment:
   It might be a little nicer to have _get_index_patterns be a method instead 
so the fallback to index_patterns can happen there. Keeps the logic of callable 
vs original config in 1 spot.



##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -114,6 +115,26 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
         raise AirflowException(f"Could not find TaskInstance for {ti}")
 
 
+def _get_index_patterns(
+    index_patterns_callable: str | None = None, ti: TaskInstance | None = None

Review Comment:
   ```suggestion
       index_patterns_callable: str | None, ti: TaskInstance
   ```
   
   I don't think we need to make these optional args.



##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -114,6 +115,26 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
         raise AirflowException(f"Could not find TaskInstance for {ti}")
 
 
+def _get_index_patterns(
+    index_patterns_callable: str | None = None, ti: TaskInstance | None = None
+) -> str | None:
+    """
+    Get index patterns by calling index_patterns_callable or None.
+
+    :param index_patterns_callable: A string representing the full path to the 
callable itself.
+    :param ti: A TaskInstance object or None.
+    """
+    if not index_patterns_callable:
+        return None
+    try:
+        module_path, index_pattern_function = 
index_patterns_callable.rsplit(".", 1)
+        module = importlib.import_module(module_path)
+        index_pattern_callable_obj = getattr(module, index_pattern_function)
+        return index_pattern_callable_obj(ti)
+    except Exception:
+        return None

Review Comment:
   Happy to return None here, but I don't think we should swallow the 
exception. If something does fail, it'll be very hard to determine why the 
expected index_patterns from the callable wasn't used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to