krisfur commented on code in PR #34627:
URL: https://github.com/apache/airflow/pull/34627#discussion_r1339757940


##########
airflow/providers/microsoft/azure/operators/container_instances.py:
##########
@@ -319,6 +319,9 @@ def _monitor_logging(self, resource_group: str, name: str) 
-> int:
                 if state in ["Running", "Terminated", "Succeeded"]:
                     try:
                         logs = self._ci_hook.get_logs(resource_group, name)
+                        if logs is None:
+                            self.log.exception("Container log is broken, 
marking as failed.")

Review Comment:
   Interesting! Looking at `execute` if that function were rerun in any regard 
we'd see another "Container group started" message, but the only new messages 
when the bug happens are container state changing to waiting/pending, 
suggesting that `monitor_logging` never completes, just throws errors and then 
correctly continues working reporting the state having changed. 
   
   Interestingly when we run the same code that induces the log failure in 
azure containers deployed manually, instead of through airflow, they did not 
restart - which could either mean it's an airflow side issue, **or** that azure 
being asked to provide a log that is None sets itself back to waiting/pending 
and then it can't be solved on airflow side (i.e. issue Azure side that gets 
triggered when you try to grab a broken log, which we can't debug really).
   
   Our deployment setup: deploying an azure container instance based on a 
simple docker image (which just has a pyenv with libraries and the script to 
run), our airflow is hosted on an azure VM in a virtual environment.
   
   Dag used for our crash inducing test run:
   ```
   import airflow
   import datetime
   from isodate import DT_BAS_COMPLETE
   from lib.default_settings import defaults, creds
   from airflow.decorators import task
   import json
   import airflow.providers.microsoft.azure.operators.azure_container_instances 
as ACI
   
   
   class CaptureACIOperator(ACI.AzureContainerInstancesOperator):
       """
       Capture expansion for ACI operator
       custom_args are used for rendering ACI kwargs:
       map_index: index of expansion
       config: config for this mapped instance
       output: where to output (standard format 
{account_url}/{container}/{prefix}/{project}/{date})
       date: date of run
       project: self explanatory
       capture: not used here
       """
       def __init__(self, **kwargs):
           command = """
           source /root/pyenv/bin/activate
           python /root/ext/ds-models/bin/run_log_test.py
           CAPEXIT=$?
           echo "Exit code $CAPEXIT"
           exit $CAPEXIT
           """
           kwargs["command"] = ["/bin/bash", "-c", command]
           kwargs[
               "name"
           ] = f"ds-ci-log-test"
           
           super().__init__(**kwargs)
   
   with airflow.DAG('log_test', description="",
           default_args=defaults.default_none, schedule_interval='0 0 * * 3', 
start_date=datetime.datetime(2022, 1, 1),
       catchup=False,tags=["development"]) as dag:
   
       t1 = CaptureACIOperator(
           dag=dag,
           task_id="log_test",
           image="asadsregistry.azurecr.io/dsmodel:log_test",
           ci_conn_id="azure-ds-ci-airflow-dspython",
           registry_conn_id="azure-asadsregistry",
           resource_group="Machine-Learning",
           region="UK South",
           memory_in_gb=8,
           cpu=4,
           pool="ci_cpus",
           pool_slots=4,
       )
   ```
   
   The script to induce failures (just multiple TQDM progress bars on any code 
that takes a bit to run):
   ```
   import re
   import pandas as pd
   import phonenumbers
   import tldextract
   from sentence_transformers import SentenceTransformer
   from urlextract import URLExtract
   import logging
   from tqdm.auto import tqdm
   
   tqdm.pandas()
   
   
   def find_phone_numbers(text):
       uk_numbers = []
       numbers = []
       exclusive_uk = []
       exclusive_foreign = []
       for match in phonenumbers.PhoneNumberMatcher(text, "GB"):
           uk_numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
       for match in phonenumbers.PhoneNumberMatcher(text, "US"):
           numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
       for match in phonenumbers.PhoneNumberMatcher(text, "AU"):
           numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
       for match in phonenumbers.PhoneNumberMatcher(text, "NZ"):
           numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
       for match in phonenumbers.PhoneNumberMatcher(text, "CA"):
           numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
       for match in phonenumbers.PhoneNumberMatcher(text, "ZA"):
           numbers.append(
               phonenumbers.format_number(
                   match.number, phonenumbers.PhoneNumberFormat.NATIONAL
               )
           )
   
       exclusive_uk = [x for x in uk_numbers if x not in numbers]
       exclusive_foreign = [x for x in numbers if x not in uk_numbers]
   
       return pd.Series(
           [
               list(set(uk_numbers)),
               list(set(numbers)),
               list(set(exclusive_uk)),
               list(set(exclusive_foreign)),
           ]
       )
   
   
   def find_urls(text):
       uk_urls = []
       foreign_urls = []
       extractor = URLExtract()
       links = list(set(extractor.find_urls(text)))
   
       for x in links:
           tld = tldextract.extract(x)
           suffix = tld.suffix
           if ".uk" in suffix:
               uk_urls.append(tld)
           if ".us" in suffix:
               foreign_urls.append(tld)
           if ".ca" in suffix:
               foreign_urls.append(tld)
           if ".nz" in suffix:
               foreign_urls.append(tld)
           if ".au" in suffix:
               foreign_urls.append(tld)
           if ".in" in suffix:
               foreign_urls.append(tld)
           if ".pk" in suffix:
               foreign_urls.append(tld)
       return pd.Series([uk_urls, foreign_urls])
   
   
   df = pd.DataFrame()
   datalist = [str(i) for i in range(int(10000))]
   df["data"] = datalist
   
   df[
       [
           "possible_uk_numbers",
           "possible_foreign_numbers",
           "definite_uk_numbers",
           "definite_foreign_numbers",
       ]
   ] = df.data.progress_apply(find_phone_numbers)
   df[["uk_urls", "foreign_urls"]] = df.data.progress_apply(find_urls)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to