This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 610594d87d [INLONG-11567][Manager] Optimize the original DAG of
Airflow (#11568)
610594d87d is described below
commit 610594d87de6e505b015a6e63b0c20128fe6db45
Author: Zkplo <[email protected]>
AuthorDate: Tue Dec 3 13:06:01 2024 +0800
[INLONG-11567][Manager] Optimize the original DAG of Airflow (#11568)
Co-authored-by: ZKpLo <[email protected]>
---
.../src/test/resources/airflow/dag_cleaner.py | 49 ++++++++++++++--------
.../src/test/resources/airflow/dag_creator.py | 9 ++--
2 files changed, 36 insertions(+), 22 deletions(-)
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
index be20fe1bb1..a24c5bebe4 100644
--- a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
@@ -29,33 +29,46 @@ from airflow.hooks.base_hook import BaseHook
from airflow import configuration
DAG_PATH = configuration.get('core', 'dags_folder') + "/"
-
+DAG_PREFIX = 'inlong_offline_task_'
def clean_expired_dags(**context):
+
original_time = context.get('execution_date')
target_timezone = pytz.timezone("Asia/Shanghai")
utc_time = original_time.astimezone(target_timezone)
- current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
- logging.info(f"Current time: {current_time}")
- for dag_file in os.listdir(DAG_PATH):
- if dag_file.endswith(".py") and
dag_file.startswith("inlong_offline_task_"):
- with open(DAG_PATH + dag_file, "r") as file:
- line = file.readline()
- while line and "end_offset_datetime_str" not in line:
+ current_time = utc_time.strftime("%Y-%m-%d %H:%M:%S.%f")
+
+ conf = context.get('dag_run').conf
+ groupId = conf.get('inlong_group_id')
+
+ logging.info(f"Execution parameters = {conf} for groupId = {groupId} and
execution time = {current_time}")
+
+ if groupId is None or len(groupId) == 0:
+ for dag_file in os.listdir(DAG_PATH):
+ if dag_file.endswith(".py") and dag_file.startswith(DAG_PREFIX):
+ dag_file_path = os.path.join(DAG_PATH, dag_file)
+ with open(dag_file_path, "r") as file:
line = file.readline()
- end_date_str = None
- if len(line.split("=")) > 1:
- end_date_str = line.split("=")[1].strip().strip("\"")
- logging.info(f"DAG end time: {end_date_str}")
- if end_date_str:
+ while line and "end_offset_datetime_str" not in line:
+ line = file.readline()
+ end_date_str = None
+ row = line.split("=")
+ if len(row) > 1:
+ end_date_str =
datetime.fromtimestamp(int(row[1].strip().strip("\"")) / 1000,
tz=target_timezone)
+ logging.info(f"The end time of {dag_file} is
{end_date_str} for groupId = {dag_file.lstrip(DAG_PREFIX)}")
try:
- if str(current_time) > str(end_date_str):
- dag_file_path = os.path.join(DAG_PATH, dag_file)
+ if end_date_str and str(current_time) >
str(end_date_str):
os.remove(dag_file_path)
- # Optionally, delete the end_date variable
- logging.info(f"Deleted expired DAG: {dag_file}")
+ logging.info(f"Deleted expired DAG: {dag_file} for
groupId = {dag_file.lstrip(DAG_PREFIX)}")
except ValueError:
- logging.error(f"Invalid date format for DAG
{dag_file}: {end_date_str}")
+ logging.error(f"Failed to delete {dag_file} for
groupId = {dag_file.lstrip(DAG_PREFIX)}")
+ else:
+ dag_file = groupId + '.py'
+ if not str(groupId).startswith(DAG_PREFIX):
+ dag_file = DAG_PREFIX + dag_file
+ os.remove(os.path.join(DAG_PATH, dag_file))
+ logging.info(f"Deleted expired DAG: {dag_file} for groupId =
{groupId}")
+
default_args = {
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
index 4034cf467c..1958ab594a 100644
--- a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
@@ -62,7 +62,7 @@ groupId = "{groupId}"
connectionId = "{connectionId}"
boundaryType = "{boundaryType}"
-target_timezone = pytz.timezone(timezone)
+target_timezone = pytz.timezone(timezone) # Specify the time zone as China
Standard Time
start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000,
tz=target_timezone)
end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000,
tz=target_timezone)
@@ -75,16 +75,13 @@ def taskFunction(**context):
"username": conn.login,
"password": conn.password
}}
- print("params", params)
headers = {{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0)
Gecko/20100101 Firefox/131.0",
"Accept": "application/json",
"Accept-Language":
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate",
- "Referer": "http://192.168.101.2:8083/",
"Content-Type": "application/json;charset=UTF-8",
"tenant": "public",
- "Origin": "http://192.168.101.2",
"Connection": "close",
"Priority": "u=0"
}}
@@ -95,8 +92,12 @@ def taskFunction(**context):
"lowerBoundary": str(int(time_interval[0])),
"upperBoundary": str(int(int(time_interval[1])))
}}
+ print("Connection ID: ", connectionId)
+ print("url: ", url)
+ print("params: ", params)
print("Request Body: ", data)
response = requests.post(url, params=params, headers=headers, json=data)
+ print("Response Code: ", response.status_code)
if response.status_code == 200:
print(response.json())
else: