This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 610594d87d [INLONG-11567][Manager] Optimize the original DAG of 
Airflow (#11568)
610594d87d is described below

commit 610594d87de6e505b015a6e63b0c20128fe6db45
Author: Zkplo <[email protected]>
AuthorDate: Tue Dec 3 13:06:01 2024 +0800

    [INLONG-11567][Manager] Optimize the original DAG of Airflow (#11568)
    
    Co-authored-by: ZKpLo <[email protected]>
---
 .../src/test/resources/airflow/dag_cleaner.py      | 49 ++++++++++++++--------
 .../src/test/resources/airflow/dag_creator.py      |  9 ++--
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py 
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
index be20fe1bb1..a24c5bebe4 100644
--- a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
@@ -29,33 +29,46 @@ from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
 
 DAG_PATH = configuration.get('core', 'dags_folder') + "/"
-
+DAG_PREFIX = 'inlong_offline_task_'
 
 def clean_expired_dags(**context):
+
     original_time = context.get('execution_date')
     target_timezone = pytz.timezone("Asia/Shanghai")
     utc_time = original_time.astimezone(target_timezone)
-    current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
-    logging.info(f"Current time: {current_time}")
-    for dag_file in os.listdir(DAG_PATH):
-        if dag_file.endswith(".py") and 
dag_file.startswith("inlong_offline_task_"):
-            with open(DAG_PATH + dag_file, "r") as file:
-                line = file.readline()
-                while line and "end_offset_datetime_str" not in line:
+    current_time = utc_time.strftime("%Y-%m-%d %H:%M:%S.%f")
+
+    conf = context.get('dag_run').conf
+    groupId = conf.get('inlong_group_id')
+
+    logging.info(f"Execution parameters = {conf} for groupId = {groupId} and 
execution time = {current_time}")
+
+    if groupId is None or len(groupId) == 0:
+        for dag_file in os.listdir(DAG_PATH):
+            if dag_file.endswith(".py") and dag_file.startswith(DAG_PREFIX):
+                dag_file_path = os.path.join(DAG_PATH, dag_file)
+                with open(dag_file_path, "r") as file:
                     line = file.readline()
-                end_date_str = None
-                if len(line.split("=")) > 1:
-                    end_date_str = line.split("=")[1].strip().strip("\"")
-                logging.info(f"DAG end time: {end_date_str}")
-                if end_date_str:
+                    while line and "end_offset_datetime_str" not in line:
+                        line = file.readline()
+                    end_date_str = None
+                    row = line.split("=")
+                    if len(row) > 1:
+                        end_date_str = 
datetime.fromtimestamp(int(row[1].strip().strip("\"")) / 1000, 
tz=target_timezone)
+                    logging.info(f"The end time of {dag_file} is 
{end_date_str} for groupId = {dag_file.lstrip(DAG_PREFIX)}")
                     try:
-                        if str(current_time) > str(end_date_str):
-                            dag_file_path = os.path.join(DAG_PATH, dag_file)
+                        if end_date_str and str(current_time) > 
str(end_date_str):
                             os.remove(dag_file_path)
-                            # Optionally, delete the end_date variable
-                            logging.info(f"Deleted expired DAG: {dag_file}")
+                            logging.info(f"Deleted expired DAG: {dag_file} for 
groupId = {dag_file.lstrip(DAG_PREFIX)}")
                     except ValueError:
-                        logging.error(f"Invalid date format for DAG 
{dag_file}: {end_date_str}")
+                        logging.error(f"Failed to delete {dag_file} for 
groupId = {dag_file.lstrip(DAG_PREFIX)}")
+    else:
+        dag_file = groupId + '.py'
+        if not str(groupId).startswith(DAG_PREFIX):
+            dag_file = DAG_PREFIX + dag_file
+        os.remove(os.path.join(DAG_PATH, dag_file))
+        logging.info(f"Deleted expired DAG: {dag_file} for groupId = 
{groupId}")
+
 
 
 default_args = {
diff --git 
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py 
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
index 4034cf467c..1958ab594a 100644
--- a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
@@ -62,7 +62,7 @@ groupId = "{groupId}"
 connectionId = "{connectionId}"
 boundaryType = "{boundaryType}"
 
-target_timezone = pytz.timezone(timezone)
+target_timezone = pytz.timezone(timezone)  # Specify the time zone as China 
Standard Time
 
 start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000, 
tz=target_timezone)
 end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000, 
tz=target_timezone)
@@ -75,16 +75,13 @@ def taskFunction(**context):
         "username": conn.login,
         "password": conn.password
     }}
-    print("params", params)
     headers = {{
         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) 
Gecko/20100101 Firefox/131.0",
         "Accept": "application/json",
         "Accept-Language": 
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
         "Accept-Encoding": "gzip, deflate",
-        "Referer": "http://192.168.101.2:8083/";,
         "Content-Type": "application/json;charset=UTF-8",
         "tenant": "public",
-        "Origin": "http://192.168.101.2";,
         "Connection": "close",
         "Priority": "u=0"
     }}
@@ -95,8 +92,12 @@ def taskFunction(**context):
         "lowerBoundary": str(int(time_interval[0])),
         "upperBoundary": str(int(int(time_interval[1])))
     }}
+    print("Connection ID: ", connectionId)
+    print("url: ", url)
+    print("params: ", params)
     print("Request Body: ", data)
     response = requests.post(url, params=params, headers=headers, json=data)
+    print("Response Code: ", response.status_code)
     if response.status_code == 200:
         print(response.json())
     else:

Reply via email to