Zkplo opened a new pull request, #11479:
URL: https://github.com/apache/inlong/pull/11479
<!-- Prepare a Pull Request
Change the title of pull request refer to the following example:
[INLONG-XYZ][Component] Title of the pull request
-->
<!-- Specify the issue this pull request going to fix.
The following *XYZ* should be replaced by the actual [GitHub
Issue](https://github.com/apache/inlong/issues) number)-->
Fixes #11400
### Motivation
<!--Explain here the context, and why you're making that change. What is the
problem you're trying to solve.-->
### Modifications
#### Need to know about Airflow
1. By default, Airflow rejects all REST API requests. We need to configure
it according to the requirements of the [[official
documentation](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html)](https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/api-authentication.html).
2. Airflow Connections is used to store credentials for connecting to other
systems to ensure the security of credentials. For specific reference:
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html.
3. Airflow does not provide an API for DAG creation, so if we want to
integrate with Inlong, it requires the original DAG.
<details>
<summary>dag_creator.py</summary>
````python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import os
import logging
import pytz
from croniter import croniter
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
DAG_PATH = configuration.get('core', 'dags_folder') + "/"
def clean_expired_dags(**context):
original_time = context.get('execution_date')
target_timezone = pytz.timezone("Asia/Shanghai")
utc_time = original_time.astimezone(target_timezone)
current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
logging.info(f"Current time: {current_time}")
for dag_file in os.listdir(DAG_PATH):
if dag_file.endswith(".py") and
dag_file.startswith("inlong_offline_task_"):
with open(DAG_PATH + dag_file, "r") as file:
line = file.readline()
while line and "end_offset_datetime_str" not in line:
line = file.readline()
end_date_str = None
if len(line.split("=")) > 1:
end_date_str = line.split("=")[1].strip().strip("\"")
logging.info(f"DAG end time: {end_date_str}")
if end_date_str:
try:
if str(current_time) > str(end_date_str):
dag_file_path = os.path.join(DAG_PATH, dag_file)
os.remove(dag_file_path)
# Optionally, delete the end_date variable
logging.info(f"Deleted expired DAG: {dag_file}")
except ValueError:
logging.error(f"Invalid date format for DAG
{dag_file}: {end_date_str}")
default_args = {
'owner': 'airflow',
'start_date': datetime.now() - timedelta(minutes=5),
'catchup': False,
'tags': ["inlong"]
}
dag = DAG(
'dag_cleaner',
default_args=default_args,
schedule_interval="*/20 * * * *",
is_paused_upon_creation=False
)
clean_task = PythonOperator(
task_id='clean_expired_dags',
python_callable=clean_expired_dags,
provide_context=True,
dag=dag,
)
````
</details>
<details>
<summary>dag_cleaner.py</summary>
````python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
import os
from datetime import datetime
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
DAG_PATH = configuration.get('core', 'dags_folder') + "/"
DAG_PREFIX = 'inlong_offline_task_'
def create_dag_file(**context):
conf = context.get('dag_run').conf
print('conf: ', conf)
groupId = conf.get('inlong_group_id')
task_name = DAG_PREFIX + groupId
timezone = conf.get('timezone')
boundaryType = str(conf.get('boundary_type'))
start_time = int(conf.get('start_time'))
end_time = int(conf.get('end_time'))
cron_expr = conf.get('cron_expr')
seconds_interval = conf.get('seconds_interval')
schedule_interval = cron_expr
if cron_expr is None or len(cron_expr) == 0:
schedule_interval = f'timedelta(seconds={seconds_interval})'
else:
schedule_interval = '"' + cron_expr + '"'
connectionId = conf.get('connection_id')
dag_content = f'''from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from croniter import croniter
from airflow.hooks.base_hook import BaseHook
import requests
import pytz
timezone = "{timezone}"
start_offset_datetime_str = {start_time}
end_offset_datetime_str = {end_time}
schedule_interval = {schedule_interval} # Or put cron expression
dag_id = "{task_name}"
groupId = "{groupId}"
connectionId = "{connectionId}"
boundaryType = "{boundaryType}"
target_timezone = pytz.timezone(timezone)
start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000,
tz=target_timezone)
end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000,
tz=target_timezone)
def taskFunction(**context):
print("#########################")
conn = BaseHook.get_connection(connectionId)
url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}"
params = {{
"username": conn.login,
"password": conn.password
}}
print("params", params)
headers = {{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0)
Gecko/20100101 Firefox/131.0",
"Accept": "application/json",
"Accept-Language":
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate",
"Referer": "http://192.168.101.2:8083/",
"Content-Type": "application/json;charset=UTF-8",
"tenant": "public",
"Origin": "http://192.168.101.2",
"Connection": "close",
"Priority": "u=0"
}}
time_interval = get_time_interval(context)
data = {{
"boundaryType": boundaryType,
"groupId": groupId,
"lowerBoundary": str(int(time_interval[0])),
"upperBoundary": str(int(int(time_interval[1])))
}}
print("Request Body: ", data)
response = requests.post(url, params=params, headers=headers, json=data)
if response.status_code == 200:
print(response.json())
else:
print(response.text)
print("#########################")
def get_time_interval(context):
execution_date = context.get('execution_date')
execution_date = execution_date.astimezone(target_timezone)
dag = context.get('dag')
schedule_interval = dag.schedule_interval
if isinstance(schedule_interval, timedelta):
return execution_date.timestamp(), (execution_date +
schedule_interval).timestamp()
else:
cron_expr = dag.schedule_interval
cron = croniter(cron_expr, execution_date)
next_run = cron.get_next(datetime)
return execution_date.timestamp(), next_run.timestamp()
default_args = {{
'owner': 'inlong',
'start_date': start_date,
'end_date': end_date,
'catchup': False,
}}
dag = DAG(
dag_id,
default_args=default_args,
schedule_interval=schedule_interval,
is_paused_upon_creation=False
)
clean_task = PythonOperator(
task_id=dag_id,
python_callable=taskFunction,
provide_context=True,
dag=dag,
)
'''
dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py')
with open(dag_file_path, 'w') as f:
f.write(dag_content)
print(f'Generated DAG file: {dag_file_path}')
default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup':
False}
dag = DAG('dag_creator', default_args=default_args, schedule_interval=None,
is_paused_upon_creation=False)
create_dag_task = PythonOperator(task_id='create_dag_file',
python_callable=create_dag_file, provide_context=True, dag=dag)
````
</details>
#### System design:
1. In order to facilitate the maintenance and expansion of AIRFLOW interface
support in the future, the `AirflowApi` interface and the `BaseAirflowApi`
abstract class are designed, and subsequent expansion only needs to be done on
this basis.
2. Implement a unified request class `AirflowServerClient` for the interface.
3. Add two Interceptors to OkHttpClient. `AirflowAuthInterceptor` is used
for unified authorization of the interface, and `LoggingInterceptor` is used
for logging.
#### Results:
When we issue a scheduled task, Airflow's `dag_creator `will receive
information from Inlong manager and create an offline task DAG based on the
information.As shown in the figure below.

<details>
<summary>Inlong Manager Log</summary>
```
[ ] 2024-11-08 12:38:22.667 - INFO [inlong-workflow-0]
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 100
to 101 for groupId=test_offline_1
[ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0]
.a.i.m.s.ScheduleClientFactory:51 - Get schedule engine client success for
Airflow
[ ] 2024-11-08 12:38:22.672 - INFO [inlong-workflow-0]
.i.m.s.a.AirflowScheduleEngine:138 - Registering DAG for test_offline_1
[ ] 2024-11-08 12:38:23.120 - INFO [inlong-workflow-0]
a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address:
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI:
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method:
POST, Response status code: 200
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0]
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 101
to 102 for groupId=test_offline_1
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0]
a.i.m.s.s.ScheduleOperatorImpl:150 - Register schedule info success for group
test_offline_1
[ ] 2024-11-08 12:38:23.139 - INFO [inlong-workflow-0]
.GroupScheduleResourceListener:82 - success to process schedule resource for
group=test_offline_1
[ ] 2024-11-08 12:38:23.163 - INFO [inlong-workflow-0]
.l.g.InitGroupCompleteListener:78 - begin to execute InitGroupCompleteListener
for groupId=test_offline_1
[ ] 2024-11-08 12:38:23.164 - INFO [inlong-workflow-0]
i.m.s.g.InlongGroupServiceImpl:540 - begin to update group status to [130] for
groupId=test_offline_1 by user=admin
[ ] 2024-11-08 12:38:23.168 - INFO [inlong-workflow-0]
i.m.s.g.InlongGroupServiceImpl:558 - success to update group status to [130]
for groupId=test_offline_1 by user=admin
[ ] 2024-11-08 12:38:23.188 - WARN [inlong-workflow-0]
i.m.s.g.InlongGroupServiceImpl:249 - start time is less than current time,
re-set to current time for groupId=test_offline_1,
startTime=2024-11-08T12:34:47.000+0000,
newStartTime=2024-11-08T12:38:23.188+0000
[ ] 2024-11-08 12:38:23.197 - INFO [inlong-workflow-0]
.a.i.m.s.s.ScheduleServiceImpl:111 - success to update schedule info for
groupId=test_offline_1
[ ] 2024-11-08 12:38:23.202 - INFO [inlong-workflow-0]
.a.i.m.s.s.ScheduleServiceImpl:131 - success to update schedule status from 102
to 103 for groupId=test_offline_1
[ ] 2024-11-08 12:38:23.203 - INFO [inlong-workflow-0]
.i.m.s.a.AirflowScheduleEngine:203 - Updating DAG for test_offline_1
[ ] 2024-11-08 12:38:23.463 - INFO [inlong-workflow-0]
a.i.m.s.a.i.LoggingInterceptor:38 - Airflow API request information - Address:
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, URI:
http://192.168.3.110:8080/api/v1/dags/dag_creator/dagRuns, Request method:
POST, Response status code: 200
```
</details>
The task has been executed for a certain period of time, and you can see
that the current interval is the time of the last execution and the interval of
the next execution meets expectations.

Now, I will modify the execution interval of this task through the inlong
dashboard.

You can see that the modification is successful, but the Last Run field of
the Web UI will not be reflected immediately, but it can be seen in Run After.

Next, change the scheduling period of this offline task to a Cron expression.

From the figure below we can see that the modification has been successful.

The execution results of each scheduled task are as follows:

There are two ways to delete files. One is that `dag_cleaner ` will
regularly scan the files that meet the rules in the directory to determine
whether their end time exceeds the current time. The second is that Inlong
manager triggers `dag_cleaner ` through an interface with parameters, and
`dag_cleaner` will directly delete the Dag file. For the latter, Inlong manager
will also delete the DAG loaded into the memory instance through another
interface.

<!--Describe the modifications you've done.-->
### Verifying this change
*(Please pick either of the following options)*
- [ ] This change is a trivial rework/code cleanup without any test coverage.
- [x] This change is already covered by existing tests, such as:
*(please describe tests)*
- [ ] This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]