MartinKChen commented on issue #19043:
URL: https://github.com/apache/airflow/issues/19043#issuecomment-1006295682
@xiaochong2dai I did not fix the issue by modifying Airflow's source code as
we are using MWAA (Amazon Managed Workflow for Apache Airflow), which make it
impossible to reach the code.
Alternatively, a workaround for the issue is create our own operator and
DAGs for shard group.
1. Create an operator that inherit from SmartSensorOperator, which used by
Shards, and overwrite the "execute" function by comment out the code causes
Shard fail. Following the sample code:
```
# import base class
# Airflow
from airflow.sensors.smart_sensor import SmartSensorOperator
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.net import get_hostname
from airflow.utils.decorators import apply_defaults
# (General) import modules/classes for processing
from time import sleep
class MySmartSensorOperator(SmartSensorOperator):
@apply_defaults
def __init__(
self,
poke_interval=60, # overwrite the poke interval by our own
requirement, which was not allowed by default
smart_sensor_timeout=60 * 60 * 24 * 7,
soft_fail=False,
shard_min=0,
shard_max=100000,
poke_timeout=6.0,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = soft_fail
self.timeout = smart_sensor_timeout
self._validate_input_values()
self.hostname = ''
self.sensor_works = []
self.cached_dedup_works = {}
self.cached_sensor_exceptions = {}
self.max_tis_per_query = 50
self.shard_min = shard_min
self.shard_max = shard_max
self.poke_timeout = poke_timeout
def execute(self, context):
# started_at = timezone.utcnow()
self.hostname = get_hostname()
while True:
poke_start_time = timezone.utcnow()
self.flush_cached_sensor_poke_results()
self._load_sensor_works()
self.log.info('Loaded %s sensor_works', len(self.sensor_works))
Stats.gauge('smart_sensor_operator.loaded_tasks',
len(self.sensor_works))
for sensor_work in self.sensor_works:
self._execute_sensor_work(sensor_work)
duration = (timezone.utcnow() - poke_start_time).total_seconds()
self.log.info('Taking %s to execute %s tasks.', duration,
len(self.sensor_works))
Stats.timing('smart_sensor_operator.loop_duration', duration)
Stats.gauge('smart_sensor_operator.executed_tasks',
len(self.sensor_works))
self._emit_loop_stats()
if duration < self.poke_interval:
sleep(self.poke_interval - duration)
# comment out the code that causes shard fail
# if (timezone.utcnow() - started_at).total_seconds() >
self.timeout:
# self.log.info('Time is out for smart sensor.')
# return
```
2. Create Shard DAGs that uses the operator above. Which copied from
"airflow-2.0.2\airflow\smart_sensor_dags\smart_sensor_group.py" with different
Operator.
```
"""Smart sensor DAGs managing all smart sensor tasks."""
from datetime import timedelta
from airflow.configuration import conf
from airflow.models import DAG
from airflow.utils.dates import days_ago
from operators.mec_smartsensor_operator import MySmartSensorOperator
default_args = {
'owner': 'admin',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'start_date': days_ago(1),
'catchup': False,
}
schedule_interval = timedelta(minutes=5)
num_smart_sensor_shard = conf.getint('smart_sensor', 'shards')
shard_code_upper_limit = conf.getint('smart_sensor',
'shard_code_upper_limit')
for i in range(num_smart_sensor_shard):
shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard
shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard
dag_id=f'admin_my_smartsensor_{i}'
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=schedule_interval,
concurrency=1,
max_active_runs=1,
dagrun_timeout=timedelta(hours=24),
)
MySmartSensorOperator(
task_id='smart_sensor_task',
dag=dag,
retries=100,
retry_delay=timedelta(seconds=10),
priority_weight=999,
shard_min=shard_min,
shard_max=shard_max,
poke_timeout=10,
smart_sensor_timeout=timedelta(hours=1).total_seconds(),
)
globals()[dag_id] = dag
```
There is a downside for the solution that smart_sensor_group_shard_* created
by Airflow sill been created automatically as they are generated by Airflow
itself, we just keep it suspended, and enable our own DAGs instead, which may
cause confusion when operating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]