krisfur commented on code in PR #34627:
URL: https://github.com/apache/airflow/pull/34627#discussion_r1339757940
##########
airflow/providers/microsoft/azure/operators/container_instances.py:
##########
@@ -319,6 +319,9 @@ def _monitor_logging(self, resource_group: str, name: str)
-> int:
if state in ["Running", "Terminated", "Succeeded"]:
try:
logs = self._ci_hook.get_logs(resource_group, name)
+ if logs is None:
+ self.log.exception("Container log is broken,
marking as failed.")
Review Comment:
Interesting! Looking at `execute` if that function were rerun in any regard
we'd see another "Container group started" message, but the only new messages
when the bug happens are container state changing to waiting/pending,
suggesting that `monitor_logging` never completes, just throws errors and then
correctly continues working reporting the state having changed.
Interestingly when we run the same code that induces the log failure in
azure containers deployed manually, instead of through airflow, they did not
restart - which could either mean it's an airflow side issue, **or** that azure
being asked to provide a log that is None sets itself back to waiting/pending
and then it can't be solved on airflow side (i.e. issue Azure side that gets
triggered when you try to grab a broken log, which we can't debug really).
Our deployment setup: deploying an azure container instance based on a
simple docker image (which just has a pyenv with libraries and the script to
run), our airflow is hosted on an azure VM in a virtual environment.
Dag used for our crash inducing test run:
```
import airflow
import datetime
from isodate import DT_BAS_COMPLETE
from lib.default_settings import defaults, creds
from airflow.decorators import task
import json
import airflow.providers.microsoft.azure.operators.azure_container_instances
as ACI
class CaptureACIOperator(ACI.AzureContainerInstancesOperator):
"""
Capture expansion for ACI operator
custom_args are used for rendering ACI kwargs:
map_index: index of expansion
config: config for this mapped instance
output: where to output (standard format
{account_url}/{container}/{prefix}/{project}/{date})
date: date of run
project: self explanatory
capture: not used here
"""
def __init__(self, **kwargs):
command = """
source /root/pyenv/bin/activate
python /root/ext/ds-models/bin/run_log_test.py
CAPEXIT=$?
echo "Exit code $CAPEXIT"
exit $CAPEXIT
"""
kwargs["command"] = ["/bin/bash", "-c", command]
kwargs[
"name"
] = f"ds-ci-log-test"
super().__init__(**kwargs)
with airflow.DAG('log_test', description="",
default_args=defaults.default_none, schedule_interval='0 0 * * 3',
start_date=datetime.datetime(2022, 1, 1),
catchup=False,tags=["development"]) as dag:
t1 = CaptureACIOperator(
dag=dag,
task_id="log_test",
image="asadsregistry.azurecr.io/dsmodel:log_test",
ci_conn_id="azure-ds-ci-airflow-dspython",
registry_conn_id="azure-asadsregistry",
resource_group="Machine-Learning",
region="UK South",
memory_in_gb=8,
cpu=4,
pool="ci_cpus",
pool_slots=4,
)
```
The script to induce failures (just multiple TQDM progress bars on any code
that takes a bit to run):
```
import re
import pandas as pd
import phonenumbers
import tldextract
from sentence_transformers import SentenceTransformer
from urlextract import URLExtract
import logging
from tqdm.auto import tqdm
tqdm.pandas()
def find_phone_numbers(text):
uk_numbers = []
numbers = []
exclusive_uk = []
exclusive_foreign = []
for match in phonenumbers.PhoneNumberMatcher(text, "GB"):
uk_numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
for match in phonenumbers.PhoneNumberMatcher(text, "US"):
numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
for match in phonenumbers.PhoneNumberMatcher(text, "AU"):
numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
for match in phonenumbers.PhoneNumberMatcher(text, "NZ"):
numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
for match in phonenumbers.PhoneNumberMatcher(text, "CA"):
numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
for match in phonenumbers.PhoneNumberMatcher(text, "ZA"):
numbers.append(
phonenumbers.format_number(
match.number, phonenumbers.PhoneNumberFormat.NATIONAL
)
)
exclusive_uk = [x for x in uk_numbers if x not in numbers]
exclusive_foreign = [x for x in numbers if x not in uk_numbers]
return pd.Series(
[
list(set(uk_numbers)),
list(set(numbers)),
list(set(exclusive_uk)),
list(set(exclusive_foreign)),
]
)
def find_urls(text):
uk_urls = []
foreign_urls = []
extractor = URLExtract()
links = list(set(extractor.find_urls(text)))
for x in links:
tld = tldextract.extract(x)
suffix = tld.suffix
if ".uk" in suffix:
uk_urls.append(tld)
if ".us" in suffix:
foreign_urls.append(tld)
if ".ca" in suffix:
foreign_urls.append(tld)
if ".nz" in suffix:
foreign_urls.append(tld)
if ".au" in suffix:
foreign_urls.append(tld)
if ".in" in suffix:
foreign_urls.append(tld)
if ".pk" in suffix:
foreign_urls.append(tld)
return pd.Series([uk_urls, foreign_urls])
df = pd.DataFrame()
datalist = [str(i) for i in range(int(10000))]
df["data"] = datalist
df[
[
"possible_uk_numbers",
"possible_foreign_numbers",
"definite_uk_numbers",
"definite_foreign_numbers",
]
] = df.data.progress_apply(find_phone_numbers)
df[["uk_urls", "foreign_urls"]] = df.data.progress_apply(find_urls)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]