This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new befe172c08 [INLONG-11400][Manager] Support Airflow schedule engine
(#11479)
befe172c08 is described below
commit befe172c08b4e07f6de378cd36c392ea452bf0fd
Author: Zkplo <[email protected]>
AuthorDate: Wed Nov 20 22:04:03 2024 +0800
[INLONG-11400][Manager] Support Airflow schedule engine (#11479)
---
.../pojo/schedule/airflow/AirflowConnection.java | 70 +++++
.../inlong/manager/pojo/schedule/airflow/DAG.java | 50 ++++
.../pojo/schedule/airflow/DAGCollection.java} | 30 ++-
.../manager/pojo/schedule/airflow/DAGRun.java | 50 ++++
.../manager/pojo/schedule/airflow/DAGRunConf.java | 68 +++++
.../manager/pojo/schedule/airflow/Error.java | 52 ++++
.../manager/schedule/ScheduleEngineType.java | 1 +
.../schedule/airflow/AirFlowAPIConstant.java | 40 +++
.../schedule/airflow/AirflowScheduleClient.java | 58 ++++
.../schedule/airflow/AirflowScheduleEngine.java | 258 ++++++++++++++++++
.../schedule/airflow/AirflowServerClient.java | 71 +++++
.../manager/schedule/airflow/api/AirflowApi.java | 75 ++++++
.../api/AirflowResponse.java} | 37 ++-
.../schedule/airflow/api/BaseAirflowApi.java | 149 +++++++++++
.../api/connection/AirflowConnectionCreator.java | 99 +++++++
.../api/connection/AirflowConnectionGetter.java | 61 +++++
.../airflow/api/dag/DAGCollectionUpdater.java | 79 ++++++
.../schedule/airflow/api/dag/DAGDeletor.java | 69 +++++
.../schedule/airflow/api/dag/DAGUpdater.java | 78 ++++++
.../airflow/api/dagruns/DAGRunsTrigger.java | 100 +++++++
.../schedule/airflow/config/AirflowConfig.java | 86 ++++++
.../interceptor/AirflowAuthInterceptor.java | 51 ++++
.../interceptor/LoggingInterceptor.java} | 32 ++-
.../util/DAGUtil.java} | 19 +-
.../manager/schedule/airflow/util/DateUtil.java | 58 ++++
.../exception/AirflowScheduleException.java | 62 +++++
.../schedule/airflow/AirflowContainerEnv.java | 139 ++++++++++
.../airflow/AirflowScheduleEngineTest.java | 110 ++++++++
.../src/test/resources/airflow/dag_cleaner.py | 80 ++++++
.../src/test/resources/airflow/dag_creator.py | 148 +++++++++++
.../src/test/resources/airflow/docker-compose.yaml | 292 +++++++++++++++++++++
.../src/test/resources/airflow/testGroup_cron.py | 112 ++++++++
.../src/test/resources/airflow/testGroup_normal.py | 110 ++++++++
.../src/main/resources/application-dev.properties | 10 +
.../src/main/resources/application-prod.properties | 10 +
.../src/main/resources/application-test.properties | 10 +
36 files changed, 2776 insertions(+), 48 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
new file mode 100644
index 0000000000..deb056ed4f
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/AirflowConnection.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "Full representation of the connection.")
+public class AirflowConnection {
+
+ @JsonProperty("connection_id")
+ @ApiModelProperty("The connection ID.")
+ private String connectionId;
+
+ @JsonProperty("conn_type")
+ @ApiModelProperty("The connection type.")
+ private String connType;
+
+ @JsonProperty("description")
+ @ApiModelProperty("The description of the connection.")
+ private String description;
+
+ @JsonProperty("host")
+ @ApiModelProperty("Host of the connection.")
+ private String host;
+
+ @JsonProperty("login")
+ @ApiModelProperty("Login of the connection.")
+ private String login;
+
+ @JsonProperty("schema")
+ @ApiModelProperty("Schema of the connection.")
+ private String schema;
+
+ @JsonProperty("port")
+ @ApiModelProperty("Port of the connection.")
+ private Integer port;
+
+ @JsonProperty("password")
+ @ApiModelProperty("Password of the connection.")
+ private String password;
+
+ @JsonProperty("extra")
+ @ApiModelProperty("Additional information description of the connection.")
+ private String extra;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
new file mode 100644
index 0000000000..578eadb151
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAG.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAG Description Information.")
+public class DAG {
+
+ @JsonProperty("dag_id")
+ @ApiModelProperty("The ID of the DAG.")
+ private String dagId;
+
+ @JsonProperty("root_dag_id")
+ @ApiModelProperty("If the DAG is SubDAG then it is the top level DAG
identifier. Otherwise, null.")
+ private String rootDagId;
+
+ @JsonProperty("is_paused")
+ @ApiModelProperty("Whether the DAG is paused.")
+ private Boolean isPaused;
+
+ @JsonProperty("is_active")
+ @ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
+ private Boolean isActive;
+
+ @JsonProperty("description")
+ @ApiModelProperty("User-provided DAG description, which can consist of
several sentences or paragraphs that describe DAG contents.")
+ private String description;
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
similarity index 55%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
index ac71e4e2d1..7a52548f41 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGCollection.java
@@ -15,20 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.pojo.schedule.airflow;
-import lombok.Getter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-@Getter
-public enum ScheduleEngineType {
+import java.util.List;
- NONE("None"),
- QUARTZ("Quartz"),
- DOLPHINSCHEDULER("DolphinScheduler");
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "Collection of DAGs.")
+public class DAGCollection {
- private final String type;
+ @JsonProperty("dags")
+ @ApiModelProperty("List of DAGs.")
+ private List<DAG> dags = null;
- ScheduleEngineType(String type) {
- this.type = type;
- }
-}
\ No newline at end of file
+ @JsonProperty("total_entries")
+ @ApiModelProperty("The length of DAG list.")
+ private Integer totalEntries;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
new file mode 100644
index 0000000000..e9384c75da
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRun.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAGRun Description Information.")
+public class DAGRun {
+
+ @JsonProperty("conf")
+ @ApiModelProperty("JSON object describing additional configuration
parameters.")
+ private Object conf;
+
+ @JsonProperty("dag_id")
+ @ApiModelProperty("Airflow DAG id.")
+ private String dagId;
+
+ @JsonProperty("dag_run_id")
+ @ApiModelProperty("Airflow DAGRun id (Nullable).")
+ private String dagRunId;
+
+ @JsonProperty("end_date")
+ @ApiModelProperty("The end time of this DAGRun.")
+ private String endDate;
+
+ @JsonProperty("start_date")
+ @ApiModelProperty("The start time of this DAGRun.")
+ private String startDate;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
new file mode 100644
index 0000000000..4154c2526c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/DAGRunConf.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "DAGRunConf Description Information.")
+public class DAGRunConf {
+
+ @JsonProperty("inlong_group_id")
+ @ApiModelProperty("Specify the Inlong group ID")
+ private String inlongGroupId;
+
+ @JsonProperty("start_time")
+ @ApiModelProperty("The start time of DAG scheduling.")
+ private long startTime;
+
+ @JsonProperty("end_time")
+ @ApiModelProperty("The end time of DAG scheduling.")
+ private long endTime;
+
+ @JsonProperty("boundary_type")
+ @ApiModelProperty("The offline task boundary type.")
+ private String boundaryType;
+
+ @JsonProperty("cron_expr")
+ @ApiModelProperty("Cron expression.")
+ private String cronExpr;
+
+ @JsonProperty("seconds_interval")
+ @ApiModelProperty("Time interval (in seconds).")
+ private String secondsInterval;
+
+ @JsonProperty("connection_id")
+ @ApiModelProperty("Airflow Connection Id of Inlong Manager.")
+ private String connectionId;
+
+ @JsonProperty("timezone")
+ @ApiModelProperty("The timezone.")
+ private String timezone;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
new file mode 100644
index 0000000000..3eb76fd677
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/airflow/Error.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.airflow;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807)
compliant response. ")
+public class Error {
+
+ @JsonProperty("detail")
+ @ApiModelProperty("Error Details.")
+ private String detail;
+
+ @JsonProperty("instance")
+ @ApiModelProperty("Error of the instance.")
+ private String instance;
+
+ @JsonProperty("status")
+ @ApiModelProperty("Error of the status.")
+ private BigDecimal status;
+
+ @JsonProperty("title")
+ @ApiModelProperty("Error of the title.")
+ private String title;
+
+ @JsonProperty("type")
+ @ApiModelProperty("Error of the type.")
+ private String type;
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
index ac71e4e2d1..8ae586609d 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
@@ -24,6 +24,7 @@ public enum ScheduleEngineType {
NONE("None"),
QUARTZ("Quartz"),
+ AIRFLOW("Airflow"),
DOLPHINSCHEDULER("DolphinScheduler");
private final String type;
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
new file mode 100644
index 0000000000..e328d8fd0a
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirFlowAPIConstant.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+/**
+ * Contains constants for interacting with the Airflow API.
+ */
+public class AirFlowAPIConstant {
+
+ public static final String DEFAULT_TIMEZONE = "Asia/Shanghai";
+ public static final String INLONG_OFFLINE_DAG_TASK_PREFIX =
"inlong_offline_task_";
+ public static final String SUBMIT_OFFLINE_JOB_URI =
"/inlong/manager/api/group/submitOfflineJob";
+
+ // AirflowConnection
+ public static final String LIST_CONNECTIONS_URI = "/api/v1/connections";
+ public static final String GET_CONNECTION_URI =
"/api/v1/connections/{connection_id}";
+
+ // DAG
+ public static final String LIST_DAGS_URI = "/api/v1/dags";
+ public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}";
+
+ // DAGRun
+ public static final String TRIGGER_NEW_DAG_RUN_URI =
"/api/v1/dags/{dag_id}/dagRuns";
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
new file mode 100644
index 0000000000..bbb8e59149
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineType;
+
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * Built-in implementation of schedule engine client corresponding with {@link
AirflowScheduleEngine}.
+ * AirflowScheduleClient simply invokes the {@link AirflowScheduleEngine} to
register/unregister/update
+ * schedule info instead of calling a remote schedule service.
+ * */
+@Service
+public class AirflowScheduleClient implements ScheduleEngineClient {
+
+ @Resource
+ public AirflowScheduleEngine scheduleEngine;
+
+ @Override
+ public boolean accept(String engineType) {
+ return
ScheduleEngineType.AIRFLOW.getType().equalsIgnoreCase(engineType);
+ }
+
+ @Override
+ public boolean register(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleRegister(scheduleInfo);
+ }
+
+ @Override
+ public boolean unregister(String groupId) {
+ return scheduleEngine.handleUnregister(groupId);
+ }
+
+ @Override
+ public boolean update(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleUpdate(scheduleInfo);
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
new file mode 100644
index 0000000000..792307e6ae
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.common.bounded.BoundaryType;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAG;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun;
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRunConf;
+import org.apache.inlong.manager.schedule.ScheduleEngine;
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse;
+import
org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionCreator;
+import
org.apache.inlong.manager.schedule.airflow.api.connection.AirflowConnectionGetter;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGCollectionUpdater;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGDeletor;
+import org.apache.inlong.manager.schedule.airflow.api.dag.DAGUpdater;
+import org.apache.inlong.manager.schedule.airflow.api.dagruns.DAGRunsTrigger;
+import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig;
+import org.apache.inlong.manager.schedule.airflow.util.DAGUtil;
+import org.apache.inlong.manager.schedule.airflow.util.DateUtil;
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.mina.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.DEFAULT_TIMEZONE;
+import static
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX;
+import static
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.SUBMIT_OFFLINE_JOB_URI;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.DAG_DUPLICATE;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.INIT_CONNECTION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_ENGINE_SHUTDOWN_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_REGISTER_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.SCHEDULE_TASK_UPDATE_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.TASK_DAG_SWITCH_FAILED;
+
+/**
+ * Response for processing the start/register/unregister/update/stop requests
from {@link AirflowScheduleClient}
+ */
+@Service
+public class AirflowScheduleEngine implements ScheduleEngine {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirflowScheduleEngine.class);
+ private final Set<String> scheduledJobSet = new ConcurrentHashSet<>();
+ private AirflowServerClient serverClient;
+ private AirflowConfig airflowConfig;
+
+ public AirflowScheduleEngine(AirflowServerClient serverClient,
AirflowConfig airflowConfig) {
+ this.serverClient = serverClient;
+ this.airflowConfig = airflowConfig;
+ start();
+ }
+
+ @Override
+ public void start() {
+ try {
+ // Create authentication information for the Inlong Manger API
used by AirFlow
+ initConnection();
+ // Check if DagCleaner and DagCreator exist and unpause them
+ switchOriginalDAG(false);
+ // Start all task DAGs and load all DAG ID(Group Id) into the
local cache
+ switchAllTaskDAG(false);
+ LOGGER.info("Airflow initialization succeeded.");
+ } catch (Exception e) {
+ LOGGER.error("Airflow initialization failed.", e);
+ }
+ }
+
+ private void initConnection() throws Exception {
+ LOGGER.info("Initializing Inlong Manager AirflowConnection for Airflow
... ");
+ // Check if Airflow has the Inlong AirflowConnection
+ AirflowResponse<AirflowConnection> response = serverClient.sendRequest(
+ new AirflowConnectionGetter(airflowConfig.getConnectionId()));
+ if (!response.isSuccess()) {
+ AirflowConnection newConn = new
AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
+ airflowConfig.getHost(),
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
+ airflowConfig.getPort(),
airflowConfig.getInlongPassword(), "");
+ response = serverClient.sendRequest(new
AirflowConnectionCreator(newConn));
+ LOGGER.info("AirflowConnection registration response: {}",
response.toString());
+ if (!response.isSuccess()) {
+ LOGGER.error("Initialization connection failed.");
+ throw new AirflowScheduleException(INIT_CONNECTION_FAILED,
"Initialization connection failed.");
+ }
+ }
+ }
+
+ private void switchOriginalDAG(boolean isPaused) {
+ for (String dagId : Arrays.asList(airflowConfig.getDagCleanerId(),
airflowConfig.getDagCreatorId())) {
+ try {
+ AirflowResponse<DAG> response = serverClient.sendRequest(new
DAGUpdater(dagId, isPaused));
+ LOGGER.info("Response to {} the original DAG : {}", isPaused ?
"stop" : "start", response.toString());
+ if (!response.isSuccess()) {
+ throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+ String.format("%s does not exist or failed to
%s.", dagId, (isPaused ? "stop" : "start")));
+ }
+ } catch (Exception e) {
+ LOGGER.error("The original DAG {} failed.", isPaused ? "stop"
: "start", e);
+ throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+ String.format("The original DAG %s failed: %s.",
isPaused ? "stop" : "start", e.getMessage()));
+ }
+ }
+ }
+
+ private void switchAllTaskDAG(boolean isPaused) {
+ try {
+ AirflowResponse<DAGCollection> response = serverClient.sendRequest(
+ new DAGCollectionUpdater(INLONG_OFFLINE_DAG_TASK_PREFIX,
isPaused));
+ LOGGER.info("Response to {} task DAG : {}", isPaused ? "stop" :
"start", response.toString());
+ if (!response.isSuccess()) {
+ throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+ String.format("Failed to %s task DAGs.", isPaused ?
"stop" : "start"));
+ }
+ if (!isPaused) {
+ List<DAG> dagList = response.getData().getDags();
+ if (dagList != null) {
+ dagList.forEach(dag -> scheduledJobSet
+
.add(dag.getDagId().substring(INLONG_OFFLINE_DAG_TASK_PREFIX.length() - 1)));
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to {} task DAGs.", isPaused ? "stop" :
"start", e);
+ throw new AirflowScheduleException(TASK_DAG_SWITCH_FAILED,
+ String.format("Failed to %s task DAGs: %s", isPaused ?
"stop" : "start", e.getMessage()));
+ }
+ }
+
+ @Override
+ public boolean handleRegister(ScheduleInfo scheduleInfo) {
+ try {
+ LOGGER.info("Registering DAG for {}",
scheduleInfo.getInlongGroupId());
+ return doRegister(scheduleInfo, true);
+ } catch (Exception e) {
+ LOGGER.error("The Airflow scheduling task with Group ID {} failed
to register.",
+ scheduleInfo.getInlongGroupId(), e);
+ throw new AirflowScheduleException(SCHEDULE_TASK_REGISTER_FAILED,
+ String.format("The Airflow scheduling task with Group ID
%s failed to register: %s",
+ scheduleInfo.getInlongGroupId(), e.getMessage()));
+ }
+ }
+
+ @Override
+ public boolean handleUnregister(String groupId) {
+ LOGGER.info("Unregistering Airflow Dag with GroupId {} ", groupId);
+ if (scheduledJobSet.contains(groupId)) {
+ try {
+ if (!completelyDelete(DAGUtil.buildDAGIdByGroupId(groupId))) {
+ return false;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("May not be completely removed {}", groupId, e);
+ }
+ }
+ scheduledJobSet.remove(groupId);
+ LOGGER.info("Un-registered airflow schedule info for {}", groupId);
+ return true;
+ }
+
+ private boolean completelyDelete(String groupId) throws Exception {
+ // Trigger the removal of the DAG file for the Cleaner DAG
+ DAGRunConf dagRunConf = DAGRunConf.builder()
+ .inlongGroupId(DAGUtil.buildDAGIdByGroupId(groupId)).build();
+ AirflowResponse<DAGRun> response = serverClient.sendRequest(
+ new DAGRunsTrigger(airflowConfig.getDagCleanerId(),
ImmutableMap.of("conf", dagRunConf)));
+ LOGGER.info("Response to DAG file clearing: {}", response.toString());
+ if (!response.isSuccess()) {
+ LOGGER.warn("Failed to delete DAG file corresponding to {}.",
groupId);
+ return false;
+ }
+ // Delete DAG tasks that have been loaded into memory
+ AirflowResponse<Object> deleteResponse = serverClient.sendRequest(new
DAGDeletor(groupId));
+ LOGGER.info("Response to DAG scheduling instance clearing: {}",
deleteResponse.toString());
+ if (!deleteResponse.isSuccess()) {
+ LOGGER.warn("Failed to delete DAG instance corresponding to {}.",
groupId);
+ }
+ return deleteResponse.isSuccess();
+ }
+
+ @Override
+ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+ try {
+ LOGGER.info("Updating DAG for {}",
scheduleInfo.getInlongGroupId());
+ return doRegister(scheduleInfo, false);
+ } catch (Exception e) {
+ LOGGER.error("The Airflow scheduling task with Group ID {} failed
to update.",
+ scheduleInfo.getInlongGroupId(), e);
+ throw new AirflowScheduleException(SCHEDULE_TASK_UPDATE_FAILED,
+ String.format("The Airflow scheduling task with Group ID
%s failed to update: %s",
+ scheduleInfo.getInlongGroupId(), e.getMessage()));
+ }
+ }
+
+ public boolean doRegister(ScheduleInfo scheduleInfo, boolean isFirst)
throws Exception {
+ if (isFirst &&
scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
+ throw new AirflowScheduleException(DAG_DUPLICATE,
+ String.format("Group %s is already registered",
scheduleInfo.getInlongGroupId()));
+ }
+ DAGRunConf.DAGRunConfBuilder confBuilder = DAGRunConf.builder()
+ .inlongGroupId(scheduleInfo.getInlongGroupId())
+ .startTime(scheduleInfo.getStartTime().getTime())
+ .endTime(scheduleInfo.getEndTime().getTime())
+ .boundaryType(BoundaryType.TIME.getType())
+ .connectionId(airflowConfig.getConnectionId())
+ .timezone(DEFAULT_TIMEZONE);
+ if (scheduleInfo.getScheduleType() == 1) {
+ confBuilder =
confBuilder.cronExpr(scheduleInfo.getCrontabExpression());
+ } else {
+ confBuilder =
confBuilder.secondsInterval(DateUtil.intervalToSeconds(scheduleInfo.getScheduleInterval(),
+ scheduleInfo.getScheduleUnit()))
+
.startTime(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()) ==
ScheduleUnit.ONE_ROUND
+ ? scheduleInfo.getEndTime().getTime()
+ : scheduleInfo.getStartTime().getTime());
+ }
+ DAGRunConf dagRunConf = confBuilder.build();
+ AirflowResponse<DAGRun> response = serverClient.sendRequest(
+ new DAGRunsTrigger(airflowConfig.getDagCreatorId(),
ImmutableMap.of("conf", dagRunConf)));
+ LOGGER.info("DAG {} response: {}", isFirst ? "registration" :
"update", response.toString());
+ if (response.isSuccess()) {
+ scheduledJobSet.add(scheduleInfo.getInlongGroupId());
+ }
+ return response.isSuccess();
+ }
+
+ @Override
+ public void stop() {
+ try {
+ switchOriginalDAG(true);
+ switchAllTaskDAG(true);
+ } catch (Exception e) {
+ LOGGER.error("Airflow Schedule Engine shutdown failed: ", e);
+ throw new AirflowScheduleException(SCHEDULE_ENGINE_SHUTDOWN_FAILED,
+ String.format("Airflow Schedule Engine shutdown failed:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
new file mode 100644
index 0000000000..be67a36475
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowServerClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.Error;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowApi;
+import org.apache.inlong.manager.schedule.airflow.api.AirflowResponse;
+import org.apache.inlong.manager.schedule.airflow.config.AirflowConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A unified class used for Airflow RESTful API processing.
+ */
+public class AirflowServerClient {
+
+ private static final Logger logger =
LoggerFactory.getLogger(AirflowServerClient.class);
+ private final OkHttpClient httpClient;
+ private final AirflowConfig config;
+ private final ObjectMapper objectMapper;
+
+ public AirflowServerClient(OkHttpClient httpClient, AirflowConfig config) {
+ this.httpClient = httpClient;
+ this.config = config;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Send request and parse response
+ *
+ * @param apiEndpoint apiEndpoint
+ * @param <T> Response to Generic Types
+ * @return Parsed response object
+ * @throws IOException Network request exception
+ */
+ public <T> AirflowResponse<T> sendRequest(AirflowApi<T> apiEndpoint)
throws IOException {
+ Request request = apiEndpoint.buildRequest(config.getBaseUrl());
+ try (Response response = httpClient.newCall(request).execute()) {
+ String responseBody = response.body().string();
+ if (response.isSuccessful()) {
+ return new AirflowResponse<>(true,
objectMapper.readValue(responseBody, apiEndpoint.getResponseType()));
+ } else {
+ logger.error("Airflow Web API Request failed, status code: {}
, detail: {}",
+ response.code(), objectMapper.readValue(responseBody,
Error.class).getDetail());
+ return new AirflowResponse<>(false, null);
+ }
+ }
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
new file mode 100644
index 0000000000..4ff1a3284d
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowApi.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api;
+
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+/**
+ * Represents a generic interface for defining and constructing API requests
to interact with Airflow.
+ * This interface provides methods for specifying HTTP methods, endpoint
paths, parameters,
+ * request bodies, and constructing complete requests.
+ * @param <T> the type of the response expected from the API, allowing
flexibility for various response types.
+ */
+public interface AirflowApi<T> {
+
+ /**
+ * Get HTTP Method
+ * @return HTTP Method
+ */
+ HttpMethod getMethod();
+
+ /**
+ * Get the requested path (relative to baseUrl)
+ * @return Request path
+ */
+ String getPath();
+
+ /**
+ * Get path parameters to replace placeholders in the path (e.g. :
"/api/v1/dags/{dag_id}/dagRuns")
+ * @return Path parameter map
+ */
+ Map<String, String> getPathParams();
+
+ /**
+ * Get query parameters (e.g. "?Key=value")
+ * @return GET parameter map
+ */
+ Map<String, Object> getQueryParams();
+
+ /**
+ * Get the request body (applicable to methods such as POST, PUT, etc.)
+ * @return Post RequestBody Object
+ */
+ RequestBody getRequestBody();
+
+ /**
+ * Constructing a complete Request object
+ * @param baseUrl Base URL
+ * @return Constructed Request object
+ */
+ Request buildRequest(String baseUrl);
+
+ /**
+ * Returns the type of the response expected from this method.
+ * @return The expected response type.
+ */
+ Class<T> getResponseType();
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
similarity index 51%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
index ac71e4e2d1..60e0ef6366 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/AirflowResponse.java
@@ -15,20 +15,35 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.api;
-import lombok.Getter;
+/**
+ * A generic response wrapper for handling responses from Airflow services.
+ * @param <T> the type of data included in the response, allowing flexibility
for various data types.
+ */
+public class AirflowResponse<T> {
-@Getter
-public enum ScheduleEngineType {
+ private final boolean success;
+ private final T data;
- NONE("None"),
- QUARTZ("Quartz"),
- DOLPHINSCHEDULER("DolphinScheduler");
+ public AirflowResponse(boolean success, T data) {
+ this.success = success;
+ this.data = data;
+ }
- private final String type;
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public T getData() {
+ return data;
+ }
- ScheduleEngineType(String type) {
- this.type = type;
+ @Override
+ public String toString() {
+ return "AirflowResponse{" +
+ "success=" + success +
+ ", data=" + data +
+ '}';
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
new file mode 100644
index 0000000000..18a1ed5206
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/BaseAirflowApi.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api;
+
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;
+
+/**
+ * The basic implementation of Airflow API interface.
+ *
+ * @param <T> the type of the response expected from the API, allowing
flexibility for various response types.
+ */
+
+@Slf4j
+public abstract class BaseAirflowApi<T> implements AirflowApi<T> {
+
+ protected static final ObjectMapper objectMapper = new ObjectMapper();
+ protected Map<String, String> pathParams = Maps.newHashMap();
+ protected Map<String, Object> queryParams = Maps.newHashMap();
+ protected Map<String, Object> requestBodyParams = Maps.newHashMap();
+
+ @Override
+ public abstract HttpMethod getMethod();
+
+ @Override
+ public abstract String getPath();
+
+ @Override
+ public abstract Class<T> getResponseType();
+
+ @Override
+ public Map<String, String> getPathParams() {
+ return pathParams;
+ }
+
+ @Override
+ public Map<String, Object> getQueryParams() {
+ return queryParams;
+ }
+
+ /**
+ * Create JSON request body
+ * @return RequestBody Object
+ */
+ @Override
+ public RequestBody getRequestBody() {
+ try {
+ return RequestBody.create(MediaType.parse("application/json;
charset=utf-8"),
+ objectMapper.writeValueAsString(requestBodyParams));
+ } catch (Exception e) {
+ log.error("Airflow request body construction failed: {}",
e.getMessage(), e);
+ throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
+ String.format("Airflow request body construction failed:
%s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Request buildRequest(String baseUrl) {
+ // Build a complete URL
+ String path = buildPathParams(getPath(), getPathParams());
+ String url = baseUrl + path;
+
+ // Add query parameters
+ if (!getQueryParams().isEmpty()) {
+ String queryString = buildQueryString(getQueryParams());
+ url += "?" + queryString;
+ }
+
+ // Build Request Builder
+ Request.Builder builder = new Request.Builder().url(url);
+
+ // Set requests based on HTTP methods
+ switch (getMethod()) {
+ case GET:
+ builder.get();
+ break;
+ case POST:
+ builder.post(getRequestBody());
+ break;
+ case PATCH:
+ builder.patch(getRequestBody());
+ break;
+ case PUT:
+ builder.put(getRequestBody());
+ break;
+ case DELETE:
+ if (!requestBodyParams.isEmpty()) {
+ builder.delete(getRequestBody());
+ } else {
+ builder.delete();
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported HTTP method: "
+ getMethod());
+ }
+ return builder.build();
+ }
+
+ private String buildPathParams(String path, Map<String, String>
pathParams) {
+ for (Map.Entry<String, String> entry : pathParams.entrySet()) {
+ path = path.replace("{" + entry.getKey() + "}", entry.getValue());
+ }
+ return path;
+ }
+
+ private String buildQueryString(Map<String, Object> queryParams) {
+ StringBuilder sb = new StringBuilder();
+ // Multiple values can be specified for the same parameter name in the
Get parameter.
+ // (e.g. "?Key=value1&Key=value2")
+ queryParams.forEach((key, value) -> {
+ if (value instanceof List) {
+ ((List) value).forEach(item ->
sb.append(key).append("=").append(item).append("&"));
+ } else {
+ sb.append(key).append("=").append(value).append("&");
+ }
+ });
+ if (sb.length() > 0) {
+ sb.setLength(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
new file mode 100644
index 0000000000..1e5b7d737c
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionCreator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.connection;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.RequestBody;
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+import static
org.apache.inlong.manager.schedule.exception.AirflowScheduleException.AirflowErrorCode.BUILD_REQUEST_BODY_FAILED;
+
+/**
+ * Build call for AirflowConnectionGetter<br>
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> connection_id </td><td> The connection ID. </td></tr>
+ * <tr><td> conn_type </td><td> The connection type. </td></tr>
+ * <tr><td> description </td><td> The description of the connection. </td></tr>
+ * <tr><td> host </td><td> Host of the connection. </td></tr>
+ * <tr><td> login </td><td> Login of the connection. </td></tr>
+ * <tr><td> schema </td><td> Schema of the connection. </td></tr>
+ * <tr><td> port </td><td> Port of the connection. </td></tr>
+ * <tr><td> password </td><td> Password of the connection. </td></tr>
+ * <tr><td> extra </td><td> Other values that cannot be put into another
field, e.g. RSA keys.(optional) </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 400 </td><td> Client specified an invalid argument. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td></tr>
+ * </table>
+ */
+@Slf4j
+public class AirflowConnectionCreator extends
BaseAirflowApi<AirflowConnection> {
+
+ AirflowConnection connection = null;
+
+ public AirflowConnectionCreator(AirflowConnection connection) {
+ this.connection = connection;
+ }
+
+ public AirflowConnectionCreator(Map<String, Object> requestBodyParams) {
+ this.requestBodyParams = requestBodyParams;
+ }
+
+ @Override
+ public RequestBody getRequestBody() {
+ if (connection != null) {
+ try {
+ return RequestBody.create(MediaType.parse("application/json;
charset=utf-8"),
+ objectMapper.writeValueAsString(connection));
+ } catch (Exception e) {
+ log.error("Airflow request body construction failed: {}",
e.getMessage(), e);
+ throw new AirflowScheduleException(BUILD_REQUEST_BODY_FAILED,
+ String.format("Airflow request body construction
failed: %s", e.getMessage()));
+ }
+ }
+ return super.getRequestBody();
+ }
+
+ @Override
+ public Class<AirflowConnection> getResponseType() {
+ return AirflowConnection.class;
+ }
+
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.POST;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.LIST_CONNECTIONS_URI;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
new file mode 100644
index 0000000000..7dc278ae0b
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/connection/AirflowConnectionGetter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.connection;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.AirflowConnection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for AirflowConnectionGetter<br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> connection_id </td><td> The connection ID. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class AirflowConnectionGetter extends BaseAirflowApi<AirflowConnection>
{
+
+ public AirflowConnectionGetter(String connectionId) {
+ pathParams.put("connection_id", connectionId);
+ }
+
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.GET;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.GET_CONNECTION_URI;
+ }
+
+ @Override
+ public Class<AirflowConnection> getResponseType() {
+ return AirflowConnection.class;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
new file mode 100644
index 0000000000..039a18d9d8
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGCollectionUpdater.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGCollection;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Build call for DAGCollectionUpdater< br>
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> limit </td><td> The numbers of items to return. (optional, default
to 100) </td></tr>
+ * <tr><td> offset </td><td> The number of items to skip before starting to
collect the result set. (optional) </td></tr>
+ * <tr><td> tags </td><td> List of tags to filter results.(optional) </td></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If
absent or empty, all modifiable fields are updated. A comma-separated list of
fully qualified names of fields.(optional) </td></tr>
+ * <tr><td> only_active </td><td> Only filter active DAGs. (optional, default
to true) </td></tr>
+ * <tr><td> dag_id_pattern </td><td> If set, only return DAGs with dag_ids
matching this pattern. (required) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td><td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td><td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td><td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td><td></tr>
+ * </table>
+ */
+public class DAGCollectionUpdater extends BaseAirflowApi<DAGCollection> {
+
+ public DAGCollectionUpdater(String dagIdPattern, boolean isPaused) {
+ this.queryParams.put("dag_id_pattern", dagIdPattern);
+ this.requestBodyParams.put("is_paused", isPaused);
+ }
+
+ public DAGCollectionUpdater(Map<String, Object> queryParams, Map<String,
Object> requestBodyParams) {
+ this.queryParams = queryParams;
+ this.requestBodyParams = requestBodyParams;
+ }
+
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.PATCH;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.LIST_DAGS_URI;
+ }
+
+ @Override
+ public Class<DAGCollection> getResponseType() {
+ return DAGCollection.class;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
new file mode 100644
index 0000000000..23a348d766
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGDeletor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for DAGDeleter< br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If
absent or empty, all modifiable fields are updated. A comma-separated list of
fully qualified names of fields.(optional) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class DAGDeletor extends BaseAirflowApi<Object> {
+
+ public DAGDeletor(String dagId) {
+ this.pathParams.put("dag_id", dagId);
+ }
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.DELETE;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.UPDATE_DAG_URI;
+ }
+
+ @Override
+ public Class<Object> getResponseType() {
+ return Object.class;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
new file mode 100644
index 0000000000..be8313f1b1
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dag/DAGUpdater.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dag;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAG;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Build call for DAGUpdater< br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> GET Param </th><th> Description </th></tr>
+ * <tr><td> update_mask </td><td> The fields to update on the resource. If
absent or empty, all modifiable fields are updated. A comma-separated list of
fully qualified names of fields.(optional) </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr><td> is_paused </td><td> Whether the DAG is paused. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+public class DAGUpdater extends BaseAirflowApi<DAG> {
+
+ public DAGUpdater(String dagId, boolean isPaused) {
+ this.pathParams.put("dag_id", dagId);
+ this.requestBodyParams.put("is_paused", isPaused);
+ }
+
+ public DAGUpdater(String dagId, String updateMask, boolean isPaused) {
+ this.pathParams.put("dag_id", dagId);
+ this.queryParams.put("update_mask", updateMask);
+ this.requestBodyParams.put("is_paused", isPaused);
+ }
+
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.PATCH;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.UPDATE_DAG_URI;
+ }
+
+ @Override
+ public Class<DAG> getResponseType() {
+ return DAG.class;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
new file mode 100644
index 0000000000..b9fe7b2260
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/api/dagruns/DAGRunsTrigger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.api.dagruns;
+
+import org.apache.inlong.manager.pojo.schedule.airflow.DAGRun;
+import org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant;
+import org.apache.inlong.manager.schedule.airflow.api.BaseAirflowApi;
+
+import org.springframework.http.HttpMethod;
+
+import java.util.Map;
+
+/**
+ * Build call for DAGRunsTrigger <br>
+ * <table border="10">
+ * <tr><th> Path Param </th><th> Description </th></tr>
+ * <tr><td> dag_id </td><td> The DAG ID. </td></tr>
+ * </table>
+ *
+ * <table border="10">
+ * <tr><th> Request Body Param </th><th> Description </th></tr>
+ * <tr>
+ * <td> conf </td>
+ * <td>
+ * JSON object describing additional configuration parameters. <br>
+ * The value of this field can be set only when creating the object.
If you try to modify the field of an existing object, the request fails with an
BAD_REQUEST error.<br>
+ * </td>
+ * </tr>
+ * <tr>
+ * <td> dag_run_id </td>
+ * <td> Run ID. <br>
+ * The value of this field can be set only when creating the object.
If you try to modify the field of an existing object, the request fails with an
BAD_REQUEST error. <br>
+ * If not provided, a value will be generated based on execution_date.
<br>
+ * If the specified dag_run_id is in use, the creation request fails
with an ALREADY_EXISTS error. <br>
+ * This together with DAG_ID are a unique key.<br>
+ * </td>
+ * </tr>
+ * <tr><td> data_interval_end </td><td> The end of the interval the DAG run
covers. </td></tr>
+ * <tr><td> data_interval_start </td><td> The beginning of the interval the
DAG run covers. </td></tr>
+ * <tr>
+ * <td> logical_date </td>
+ * <td>
+ * The logical date (previously called execution date). This is the
time or interval covered by this DAG run, according to the DAG definition. <br>
+ * The value of this field can be set only when creating the object.
If you try to modify the field of an existing object, the request fails with an
BAD_REQUEST error.<br>
+ * This together with DAG_ID are a unique key. <br>
+ * </td>
+ * </tr>
+ * <tr><td> note </td><td> Contains manually entered notes by the user about
the DagRun. </td></tr>
+ * </table>
+ *
+ * @http.response.details <table summary="Response Details" border="1">
+ * <tr><th> Status Code </th><th> Description </th></tr>
+ * <tr><td> 200 </td><td> Success. </td></tr>
+ * <tr><td> 401 </td><td> Request not authenticated due to missing, invalid,
authentication info. </td></tr>
+ * <tr><td> 403 </td><td> Client does not have sufficient permission.
</td></tr>
+ * <tr><td> 404 </td><td> A specified resource is not found. </td></tr>
+ * </table>
+ */
+
+public class DAGRunsTrigger extends BaseAirflowApi<DAGRun> {
+
+ public DAGRunsTrigger(String dagId) {
+ this.pathParams.put("dag_id", dagId);
+ }
+
+ public DAGRunsTrigger(String dagId, Map<String, Object> requestBodyParams)
{
+ this.pathParams.put("dag_id", dagId);
+ this.requestBodyParams = requestBodyParams;
+ }
+
+ @Override
+ public HttpMethod getMethod() {
+ return HttpMethod.POST;
+ }
+
+ @Override
+ public String getPath() {
+ return AirFlowAPIConstant.TRIGGER_NEW_DAG_RUN_URI;
+ }
+
+ @Override
+ public Class<DAGRun> getResponseType() {
+ return DAGRun.class;
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
new file mode 100644
index 0000000000..60bb6673ce
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.config;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.schedule.airflow.AirflowServerClient;
+import
org.apache.inlong.manager.schedule.airflow.interceptor.AirflowAuthInterceptor;
+import
org.apache.inlong.manager.schedule.airflow.interceptor.LoggingInterceptor;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import okhttp3.OkHttpClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class AirflowConfig extends ClientConfiguration {
+
+ @Value("${schedule.engine.airflow.inlong.manager.host:127.0.0.1}")
+ private String host;
+
+ @Value("${server.port:8083}")
+ private int port;
+
+ @Value("${default.admin.user:admin}")
+ private String inlongUsername;
+
+ @Value("${default.admin.password:inlong}")
+ private String inlongPassword;
+
+ @Value("${schedule.engine.airflow.connection.id:inlong_connection}")
+ private String connectionId;
+
+ @Value("${schedule.engine.airflow.cleaner.id:dag_cleaner}")
+ private String dagCleanerId;
+
+ @Value("${schedule.engine.airflow.creator.id:dag_creator}")
+ private String dagCreatorId;
+
+ @Value("${schedule.engine.airflow.username:airflow}")
+ private String airflowUsername;
+
+ @Value("${schedule.engine.airflow.password:airflow}")
+ private String airflowPassword;
+
+ @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}")
+ private String baseUrl;
+
+ @Bean
+ public OkHttpClient okHttpClient() {
+ return new OkHttpClient.Builder()
+ .addInterceptor(new
AirflowAuthInterceptor(this.getAirflowUsername(), this.getAirflowPassword()))
+ .addInterceptor(new LoggingInterceptor())
+ .connectTimeout(this.getConnectTimeout(), this.getTimeUnit())
+ .readTimeout(this.getReadTimeout(), this.getTimeUnit())
+ .writeTimeout(this.getWriteTimeout(), this.getTimeUnit())
+ .retryOnConnectionFailure(true)
+ .build();
+ }
+ @Bean
+ public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient,
AirflowConfig airflowConfig) {
+ return new AirflowServerClient(okHttpClient, airflowConfig);
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
new file mode 100644
index 0000000000..714614bf94
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/AirflowAuthInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.interceptor;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+import java.util.Base64;
+
+/**
+ * AirflowAuthInterceptor
+ * Before okhttp call a request, uniformly encapsulate the relevant parameters
of authentication
+ */
+@Slf4j
+public class AirflowAuthInterceptor implements Interceptor {
+
+ // Airflow Authentication Header
+ private final String authHeader;
+
+ public AirflowAuthInterceptor(String username, String password) {
+ String credentials = username + ":" + password;
+ this.authHeader = "Basic " +
Base64.getEncoder().encodeToString(credentials.getBytes());
+ }
+
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request originalRequest = chain.request();
+ Request.Builder requestBuilder = originalRequest
+ .newBuilder()
+ .header("Authorization", authHeader);
+ return chain.proceed(requestBuilder.build());
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
similarity index 52%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
index ac71e4e2d1..c3028385b1 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/interceptor/LoggingInterceptor.java
@@ -15,20 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.interceptor;
-import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
-@Getter
-public enum ScheduleEngineType {
+import java.io.IOException;
- NONE("None"),
- QUARTZ("Quartz"),
- DOLPHINSCHEDULER("DolphinScheduler");
-
- private final String type;
+/**
+ * LoggingInterceptor
+ * Provide unified logging for okhttp
+ */
+@Slf4j
+public class LoggingInterceptor implements Interceptor {
- ScheduleEngineType(String type) {
- this.type = type;
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ Response response = chain.proceed(request);
+ log.info("Airflow API request information - Address: {}, URI: {},
Request method: {}, Response status code: {}",
+ request.url(), request.url().uri(), request.method(),
response.code());
+ return response;
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
similarity index 71%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
index ac71e4e2d1..fad05f2116 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DAGUtil.java
@@ -15,20 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.schedule.airflow.util;
-import lombok.Getter;
+import static
org.apache.inlong.manager.schedule.airflow.AirFlowAPIConstant.INLONG_OFFLINE_DAG_TASK_PREFIX;
-@Getter
-public enum ScheduleEngineType {
+public class DAGUtil {
- NONE("None"),
- QUARTZ("Quartz"),
- DOLPHINSCHEDULER("DolphinScheduler");
-
- private final String type;
-
- ScheduleEngineType(String type) {
- this.type = type;
+ public static String buildDAGIdByGroupId(String groupId) {
+ return INLONG_OFFLINE_DAG_TASK_PREFIX.concat(groupId);
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
new file mode 100644
index 0000000000..950e334921
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/util/DateUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow.util;
+
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+
+import java.math.BigInteger;
+import java.util.Objects;
+
+public class DateUtil {
+
+ public static String intervalToSeconds(long interval, String timeUnit) {
+ BigInteger seconds = new BigInteger(String.valueOf(interval));
+ String intervalStr = "";
+ switch
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(timeUnit))) {
+ case SECOND:
+ intervalStr = "1";
+ break;
+ case MINUTE:
+ intervalStr = "60";
+ break;
+ case HOUR:
+ intervalStr = "3600";
+ break;
+ case DAY:
+ intervalStr = "86400";
+ break;
+ case WEEK:
+ intervalStr = "604800";
+ break;
+ case MONTH:
+ intervalStr = "2592000";
+ break;
+ case YEAR:
+ intervalStr = "31536000";
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported time unit");
+ }
+ return seconds.multiply(new BigInteger(intervalStr)).toString();
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
new file mode 100644
index 0000000000..6b6830fb30
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/AirflowScheduleException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.exception;
+
+/**
+ * Represents exceptions specific to the Airflow scheduling process.
+ * Each exception is associated with a specific error code for better
identification.
+ */
+public class AirflowScheduleException extends RuntimeException {
+
+ /**
+ * Enum to define all error codes associated with Airflow scheduling
exceptions.
+ */
+ public enum AirflowErrorCode {
+ INIT_CONNECTION_FAILED,
+ TASK_DAG_SWITCH_FAILED,
+ SCHEDULE_TASK_REGISTER_FAILED,
+ SCHEDULE_TASK_UPDATE_FAILED,
+ SCHEDULE_ENGINE_SHUTDOWN_FAILED,
+ BUILD_REQUEST_BODY_FAILED,
+ DAG_DUPLICATE
+ }
+
+ private AirflowErrorCode errorCode;
+
+ public AirflowScheduleException(String message) {
+ super(message);
+ }
+ public AirflowScheduleException(AirflowErrorCode errorCode, String
message) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public AirflowScheduleException(AirflowErrorCode errorCode, String
message, Throwable cause) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ public AirflowErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ErrorCode: %s, Message: %s", errorCode,
getMessage());
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
new file mode 100644
index 0000000000..f99282a2ed
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowContainerEnv.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.schedule.exception.AirflowScheduleException;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.testcontainers.containers.ContainerState;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+@Slf4j
+public class AirflowContainerEnv {
+
+ public static String BASE_URL = "http://localhost:8080";
+ public static String AIRFLOW_USERNAME = "airflow";
+ public static String AIRFLOW_PASSWORD = "airflow";
+ public static String NORMAL_POSTFIX = "_normal";
+ public static String CORN_POSTFIX = "_cron";
+ public static String AIRFLOW_SCHEDULER_CONTAINER_NAME =
"airflow-scheduler_1";
+ public static String DOCKER_COMPOSE_YAML_PATH =
"src/test/resources/airflow/docker-compose.yaml";
+ public static String DEFAULT_DAGS_PATH = "/opt/airflow/dags/";
+
+ private static DockerComposeContainer<?> environment;
+ private static OkHttpClient httpClient = new OkHttpClient();
+
+ public static void setUp() {
+ // Step 1: Start only the airflow-init service
+ environment = new DockerComposeContainer<>(new
File(DOCKER_COMPOSE_YAML_PATH))
+ .withServices("airflow-init")
+ .withEnv("AIRFLOW_UID", "$(id -u)");
+ // Start the environment
+ environment.start();
+ // Step 2: Wait until the "airflow-init" service has completed
initialization
+ // Once initialized, stop the init-only environment and start the full
environment
+ environment.stop();
+ // Step 3: Start all services in detached mode after initialization
+ environment = new DockerComposeContainer<>(new
File(DOCKER_COMPOSE_YAML_PATH))
+ .withEnv("AIRFLOW_UID", "0")
+ .withEnv("AIRFLOW__CORE__LOAD_EXAMPLES", "false")
+ .withEnv("AIRFLOW__API__AUTH_BACKEND",
+
"airflow.providers.fab.auth_manager.api.auth.backend.basic_auth");
+ environment.start();
+ copyTestDAGs();
+ waitForDAGsLoad("dag_cleaner");
+ log.info("Airflow runtime environment created successfully.");
+ }
+
+ public static void shutDown() {
+ if (environment != null) {
+ environment.stop();
+ }
+ }
+
+ private static void copyTestDAGs() {
+ // After the DAG file is created, the scheduler will regularly scan
the DAG file directory and
+ // then load it into memory for scheduling. In order to quickly test
the update and unregister, two
+ // test DAGs need to be loaded at the beginning.
+ Optional<ContainerState> container =
environment.getContainerByServiceName(AIRFLOW_SCHEDULER_CONTAINER_NAME);
+ if (container.isPresent()) {
+ ContainerState airflowScheduler = container.get();
+ Path dagPath1 =
Paths.get("src/test/resources/airflow/dag_cleaner.py").toAbsolutePath();
+ Path dagPath2 =
Paths.get("src/test/resources/airflow/dag_creator.py").toAbsolutePath();
+ Path dagPath3 =
Paths.get("src/test/resources/airflow/testGroup_cron.py").toAbsolutePath();
+ Path dagPath4 =
Paths.get("src/test/resources/airflow/testGroup_normal.py").toAbsolutePath();
+
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath1),
+ DEFAULT_DAGS_PATH.concat("dag_cleaner.py"));
+
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath2),
+ DEFAULT_DAGS_PATH.concat("dag_creator.py"));
+
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath3),
+ DEFAULT_DAGS_PATH.concat("testGroup_cron.py"));
+
airflowScheduler.copyFileToContainer(MountableFile.forHostPath(dagPath4),
+ DEFAULT_DAGS_PATH.concat("testGroup_normal.py"));
+ try {
+ String result =
+ airflowScheduler.execInContainer("bash", "-c", "ls
".concat(DEFAULT_DAGS_PATH)).getStdout();
+ log.info(DEFAULT_DAGS_PATH.concat(" has file: {}"), result);
+ } catch (Exception e) {
+ log.warn(String.format(
+ "Copying the test DAG file may have failed. Docker
Container command(\"%s\") execution failed.",
+ "ls ".contains(DEFAULT_DAGS_PATH)), e);
+ }
+ } else {
+ log.error(String.format("Copying test DAG file failed. Airflow
scheduler container(%s) does not exist.",
+ AIRFLOW_SCHEDULER_CONTAINER_NAME));
+ throw new AirflowScheduleException("Copying test DAG file
failed.");
+ }
+ log.info("Copy test DAG file successfully.");
+ }
+
+ public static void waitForDAGsLoad(String dagId) {
+ int total = 10;
+ // Waiting for Airflow to load the initial DAG
+ while (total > 0) {
+ String credential = okhttp3.Credentials.basic(AIRFLOW_USERNAME,
AIRFLOW_PASSWORD);
+ Request request = new Request.Builder()
+ .url(BASE_URL + "/api/v1/dags/" + dagId + "/details")
+ .header("Authorization", credential)
+ .build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ if (response.code() == 200) {
+ break;
+ }
+ } catch (Exception e) {
+ log.error("The request to check if the original DAG exists
failed: {}", e.getMessage(), e);
+ }
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ total--;
+ }
+ log.info("DAG successfully loaded.");
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
new file mode 100644
index 0000000000..fe5d070afd
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngineTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.airflow;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+
+import static
org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.CORN_POSTFIX;
+import static
org.apache.inlong.manager.schedule.airflow.AirflowContainerEnv.NORMAL_POSTFIX;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Slf4j
+@EnableConfigurationProperties
+@ComponentScan(basePackages = "org.apache.inlong.manager")
+@SpringBootTest(classes = AirflowScheduleEngineTest.class)
+public class AirflowScheduleEngineTest {
+
+ @Autowired
+ private AirflowScheduleEngine scheduleEngine;
+ private static BaseScheduleTest baseScheduleTest = new BaseScheduleTest();
+
+ @BeforeAll
+ public static void initScheduleEngine() {
+ try {
+ AirflowContainerEnv.setUp();
+ } catch (Exception e) {
+ log.error("Airflow runtime environment creation failed.", e);
+ throw new RuntimeException(
+ String.format("Airflow runtime environment creation
failed: %s", e.getMessage()));
+ }
+ }
+
+ @AfterAll
+ public static void stopScheduleEngine() {
+ AirflowContainerEnv.shutDown();
+ }
+
+ @Test
+ @Order(1)
+ public void testRegisterScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+ String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX +
System.currentTimeMillis();
+ scheduleInfo.setInlongGroupId(groupId);
+ assertTrue(scheduleEngine.handleRegister(scheduleInfo));
+
+ // 2. test for cron schedule
+ scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+ groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX +
System.currentTimeMillis();
+ scheduleInfo.setInlongGroupId(groupId);
+ assertTrue(scheduleEngine.handleRegister(scheduleInfo));
+ }
+
+ @Test
+ @Order(2)
+ public void testUpdateScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+ String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX;
+ scheduleInfo.setInlongGroupId(groupId);
+ assertTrue(scheduleEngine.handleUpdate(scheduleInfo));
+
+ // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+ scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+ groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX;
+ scheduleInfo.setInlongGroupId(groupId);
+ assertTrue(scheduleEngine.handleUpdate(scheduleInfo));
+ }
+
+ @Test
+ @Order(3)
+ public void testUnRegisterScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = baseScheduleTest.genDefaultScheduleInfo();
+ String groupId = scheduleInfo.getInlongGroupId() + NORMAL_POSTFIX;
+ scheduleInfo.setInlongGroupId(groupId);
+
assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()));
+
+ // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+ scheduleInfo = baseScheduleTest.genDefaultCronScheduleInfo();
+ groupId = scheduleInfo.getInlongGroupId() + CORN_POSTFIX;
+ scheduleInfo.setInlongGroupId(groupId);
+
assertTrue(scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()));
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
new file mode 100644
index 0000000000..be20fe1bb1
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_cleaner.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from airflow.models import Variable
+from airflow.utils.dates import days_ago
+from datetime import datetime
+import os
+import logging
+import pytz
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+from airflow import configuration
+
+DAG_PATH = configuration.get('core', 'dags_folder') + "/"
+
+
+def clean_expired_dags(**context):
+ original_time = context.get('execution_date')
+ target_timezone = pytz.timezone("Asia/Shanghai")
+ utc_time = original_time.astimezone(target_timezone)
+ current_time = utc_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
+ logging.info(f"Current time: {current_time}")
+ for dag_file in os.listdir(DAG_PATH):
+ if dag_file.endswith(".py") and
dag_file.startswith("inlong_offline_task_"):
+ with open(DAG_PATH + dag_file, "r") as file:
+ line = file.readline()
+ while line and "end_offset_datetime_str" not in line:
+ line = file.readline()
+ end_date_str = None
+ if len(line.split("=")) > 1:
+ end_date_str = line.split("=")[1].strip().strip("\"")
+ logging.info(f"DAG end time: {end_date_str}")
+ if end_date_str:
+ try:
+ if str(current_time) > str(end_date_str):
+ dag_file_path = os.path.join(DAG_PATH, dag_file)
+ os.remove(dag_file_path)
+ # Optionally, delete the end_date variable
+ logging.info(f"Deleted expired DAG: {dag_file}")
+ except ValueError:
+ logging.error(f"Invalid date format for DAG
{dag_file}: {end_date_str}")
+
+
+default_args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.now() - timedelta(minutes=5),
+ 'catchup': False,
+ 'tags': ["inlong"]
+}
+
+dag = DAG(
+ 'dag_cleaner',
+ default_args=default_args,
+ schedule_interval="*/20 * * * *",
+ is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+ task_id='clean_expired_dags',
+ python_callable=clean_expired_dags,
+ provide_context=True,
+ dag=dag,
+)
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
new file mode 100644
index 0000000000..4034cf467c
--- /dev/null
+++ b/inlong-manager/manager-schedule/src/test/resources/airflow/dag_creator.py
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from airflow.operators.python_operator import PythonOperator
+from airflow.utils.dates import days_ago
+from airflow.models import Variable
+import os
+from datetime import datetime
+from airflow.hooks.base_hook import BaseHook
+from airflow import configuration
+
+DAG_PATH = configuration.get('core', 'dags_folder') + "/"
+DAG_PREFIX = 'inlong_offline_task_'
+
+def create_dag_file(**context):
+ conf = context.get('dag_run').conf
+ print('conf: ', conf)
+ groupId = conf.get('inlong_group_id')
+ task_name = DAG_PREFIX + groupId
+ timezone = conf.get('timezone')
+ boundaryType = str(conf.get('boundary_type'))
+ start_time = int(conf.get('start_time'))
+ end_time = int(conf.get('end_time'))
+ cron_expr = conf.get('cron_expr')
+ seconds_interval = conf.get('seconds_interval')
+ schedule_interval = cron_expr
+ if cron_expr is None or len(cron_expr) == 0:
+ schedule_interval = f'timedelta(seconds={seconds_interval})'
+ else:
+ schedule_interval = '"' + cron_expr + '"'
+ connectionId = conf.get('connection_id')
+ dag_content = f'''from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "{timezone}"
+start_offset_datetime_str = {start_time}
+end_offset_datetime_str = {end_time}
+schedule_interval = {schedule_interval} # Or put cron expression
+dag_id = "{task_name}"
+groupId = "{groupId}"
+connectionId = "{connectionId}"
+boundaryType = "{boundaryType}"
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000,
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000,
tz=target_timezone)
+
+def taskFunction(**context):
+ print("#########################")
+ conn = BaseHook.get_connection(connectionId)
+ url = f"http://{{conn.host}}:{{conn.port}}/{{conn.schema}}"
+ params = {{
+ "username": conn.login,
+ "password": conn.password
+ }}
+ print("params", params)
+ headers = {{
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0)
Gecko/20100101 Firefox/131.0",
+ "Accept": "application/json",
+ "Accept-Language":
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+ "Accept-Encoding": "gzip, deflate",
+ "Referer": "http://192.168.101.2:8083/",
+ "Content-Type": "application/json;charset=UTF-8",
+ "tenant": "public",
+ "Origin": "http://192.168.101.2",
+ "Connection": "close",
+ "Priority": "u=0"
+ }}
+ time_interval = get_time_interval(context)
+ data = {{
+ "boundaryType": boundaryType,
+ "groupId": groupId,
+ "lowerBoundary": str(int(time_interval[0])),
+ "upperBoundary": str(int(int(time_interval[1])))
+ }}
+ print("Request Body: ", data)
+ response = requests.post(url, params=params, headers=headers, json=data)
+ if response.status_code == 200:
+ print(response.json())
+ else:
+ print(response.text)
+ print("#########################")
+
+
+def get_time_interval(context):
+ execution_date = context.get('execution_date')
+ execution_date = execution_date.astimezone(target_timezone)
+ dag = context.get('dag')
+ schedule_interval = dag.schedule_interval
+ if isinstance(schedule_interval, timedelta):
+ return execution_date.timestamp(), (execution_date +
schedule_interval).timestamp()
+ else:
+ cron_expr = dag.schedule_interval
+ cron = croniter(cron_expr, execution_date)
+ next_run = cron.get_next(datetime)
+ return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {{
+ 'owner': 'inlong',
+ 'start_date': start_date,
+ 'end_date': end_date,
+ 'catchup': False,
+}}
+
+dag = DAG(
+ dag_id,
+ default_args=default_args,
+ schedule_interval=schedule_interval,
+ is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+ task_id=dag_id,
+ python_callable=taskFunction,
+ provide_context=True,
+ dag=dag,
+)
+ '''
+ dag_file_path = os.path.join(DAG_PATH, f'{task_name}.py')
+ with open(dag_file_path, 'w') as f:
+ f.write(dag_content)
+ print(f'Generated DAG file: {dag_file_path}')
+default_args = {'owner': 'airflow', 'start_date': days_ago(1), 'catchup':
False}
+dag = DAG('dag_creator', default_args=default_args, schedule_interval=None,
is_paused_upon_creation=False)
+create_dag_task = PythonOperator(task_id='create_dag_file',
python_callable=create_dag_file, provide_context=True, dag=dag)
\ No newline at end of file
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
new file mode 100644
index 0000000000..c97195c03f
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/resources/airflow/docker-compose.yaml
@@ -0,0 +1,292 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Basic Airflow cluster configuration for CeleryExecutor with Redis and
PostgreSQL.
+#
+# WARNING: This configuration is for local development. Do not use it in a
production deployment.
+#
+# This configuration supports basic configuration using environment variables
or an .env file
+# The following variables are supported:
+#
+# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
+# Default: apache/airflow:2.6.0
+# AIRFLOW_UID - User ID in Airflow containers
+# Default: 50000
+# AIRFLOW_PROJ_DIR - Base path to which all the files will be
volumed.
+# Default: .
+# Those configurations are useful mostly in case of standalone testing/running
Airflow in test/try-out mode
+#
+# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if
requested).
+# Default: airflow
+# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if
requested).
+# Default: airflow
+# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when
starting all containers.
+# Use this option ONLY for quick checks.
Installing requirements at container
+# startup is done EVERY TIME the service is
started.
+# A better way is to build a custom image or
extend the official image
+# as described in
https://airflow.apache.org/docs/docker-stack/build.html.
+# Default: ''
+#
+# Feel free to modify this file to suit your needs.
+---
+version: '3.8'
+x-airflow-common:
+ &airflow-common
+ # In order to add custom dependencies or upgrade provider packages you can
use your extended image.
+ # Comment the image line, place your Dockerfile in the directory where you
placed the docker-compose.yaml
+ # and uncomment the "build" line below, Then run `docker-compose build` to
build the images.
+ image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0}
+ # build: .
+ environment:
+ &airflow-common-env
+ AIRFLOW__CORE__EXECUTOR: CeleryExecutor
+ AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
+ # For backward compatibility, with Airflow <2.3
+ AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@postgres/airflow
+ AIRFLOW__CELERY__RESULT_BACKEND:
db+postgresql://airflow:airflow@postgres/airflow
+ AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
+ AIRFLOW__CORE__FERNET_KEY: ''
+ AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
+ AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
+ AIRFLOW__API__AUTH_BACKENDS:
'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
+ # yamllint disable rule:line-length
+ # Use simple http server on scheduler for health checks
+ # See
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
+ # yamllint enable rule:line-length
+ AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
+ # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
+ # for other purpose (development, test and especially production usage)
build/extend Airflow image.
+ _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
+ user: "${AIRFLOW_UID:-50000}:0"
+ depends_on:
+ &airflow-common-depends-on
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+
+services:
+ postgres:
+ image: postgres:13
+ environment:
+ POSTGRES_USER: airflow
+ POSTGRES_PASSWORD: airflow
+ POSTGRES_DB: airflow
+ volumes:
+ - postgres-db-volume:/var/lib/postgresql/data
+ healthcheck:
+ test: ["CMD", "pg_isready", "-U", "airflow"]
+ interval: 10s
+ retries: 5
+ start_period: 5s
+ restart: always
+
+ redis:
+ image: redis:latest
+ expose:
+ - 6379
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 10s
+ timeout: 30s
+ retries: 50
+ start_period: 30s
+ restart: always
+
+ airflow-webserver:
+ <<: *airflow-common
+ command: webserver
+ ports:
+ - "8080:8080"
+ healthcheck:
+ test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-scheduler:
+ <<: *airflow-common
+ command: scheduler
+ healthcheck:
+ test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-worker:
+ <<: *airflow-common
+ command: celery worker
+ healthcheck:
+ test:
+ - "CMD-SHELL"
+ - 'celery --app airflow.executors.celery_executor.app inspect ping -d
"celery@$${HOSTNAME}"'
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
+ environment:
+ <<: *airflow-common-env
+ # Required to handle warm shutdown of the celery workers properly
+ # See
https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
+ DUMB_INIT_SETSID: "0"
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-triggerer:
+ <<: *airflow-common
+ command: triggerer
+ healthcheck:
+ test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob
--hostname "$${HOSTNAME}"']
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-init:
+ <<: *airflow-common
+ entrypoint: /bin/bash
+ # yamllint disable rule:line-length
+ command:
+ - -c
+ - |
+ function ver() {
+ printf "%04d%04d%04d%04d" $${1//./ }
+ }
+ airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu
airflow airflow version)
+ airflow_version_comparable=$$(ver $${airflow_version})
+ min_airflow_version=2.2.0
+ min_airflow_version_comparable=$$(ver $${min_airflow_version})
+ if (( airflow_version_comparable < min_airflow_version_comparable ));
then
+ echo
+ echo -e "\033[1;31mERROR!!!: Too old Airflow version
$${airflow_version}!\e[0m"
+ echo "The minimum Airflow version supported:
$${min_airflow_version}. Only use this or higher!"
+ echo
+ exit 1
+ fi
+ if [[ -z "${AIRFLOW_UID}" ]]; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
+ echo "If you are on Linux, you SHOULD follow the instructions below
to set "
+ echo "AIRFLOW_UID environment variable, otherwise files will be
owned by root."
+ echo "For other operating systems you can get rid of the warning
with manually created .env file:"
+ echo " See:
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
+ echo
+ fi
+ one_meg=1048576
+ mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) /
one_meg))
+ cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
+ disk_available=$$(df / | tail -1 | awk '{print $$4}')
+ warning_resources="false"
+ if (( mem_available < 4000 )) ; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough memory available for
Docker.\e[0m"
+ echo "At least 4GB of memory required. You have $$(numfmt --to iec
$$((mem_available * one_meg)))"
+ echo
+ warning_resources="true"
+ fi
+ if (( cpus_available < 2 )); then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for
Docker.\e[0m"
+ echo "At least 2 CPUs recommended. You have $${cpus_available}"
+ echo
+ warning_resources="true"
+ fi
+ if (( disk_available < one_meg * 10 )); then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for
Docker.\e[0m"
+ echo "At least 10 GBs recommended. You have $$(numfmt --to iec
$$((disk_available * 1024 )))"
+ echo
+ warning_resources="true"
+ fi
+ if [[ $${warning_resources} == "true" ]]; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: You have not enough resources to run
Airflow (see above)!\e[0m"
+ echo "Please follow the instructions to increase amount of resources
available:"
+ echo "
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
+ echo
+ fi
+ exec /entrypoint airflow version
+ # yamllint enable rule:line-length
+ environment:
+ <<: *airflow-common-env
+ _AIRFLOW_DB_UPGRADE: 'true'
+ _AIRFLOW_WWW_USER_CREATE: 'true'
+ _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
+ _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
+ _PIP_ADDITIONAL_REQUIREMENTS: ''
+ user: "0:0"
+ volumes:
+ - ${AIRFLOW_PROJ_DIR:-.}:/sources
+
+ airflow-cli:
+ <<: *airflow-common
+ profiles:
+ - debug
+ environment:
+ <<: *airflow-common-env
+ CONNECTION_CHECK_MAX_COUNT: "0"
+ # Workaround for entrypoint issue. See:
https://github.com/apache/airflow/issues/16252
+ command:
+ - bash
+ - -c
+ - airflow
+
+ # You can enable flower by adding "--profile flower" option e.g.
docker-compose --profile flower up
+ # or by explicitly targeted on the command line e.g. docker-compose up
flower.
+ # See: https://docs.docker.com/compose/profiles/
+ flower:
+ <<: *airflow-common
+ command: celery flower
+ profiles:
+ - flower
+ ports:
+ - "5555:5555"
+ healthcheck:
+ test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+volumes:
+ postgres-db-volume:
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py
new file mode 100644
index 0000000000..b753eb7587
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_cron.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "Asia/Shanghai"
+start_offset_datetime_str = 1731072908243
+end_offset_datetime_str = 1731142800000
+schedule_interval = "*/1 * * * *" # Or put cron expression
+dag_id = "inlong_offline_task_testGroup_cron"
+groupId = "test_offline_1"
+connectionId = "inlong_connection"
+boundaryType = str("time")
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000,
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000,
tz=target_timezone)
+
+
+def taskFunction(**context):
+ print("#########################")
+ conn = BaseHook.get_connection(connectionId)
+ url = f"http://{conn.host}:{conn.port}/{conn.schema}"
+ params = {
+ "username": conn.login,
+ "password": conn.password
+ }
+ print("params", params)
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0)
Gecko/20100101 Firefox/131.0",
+ "Accept": "application/json",
+ "Accept-Language":
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+ "Accept-Encoding": "gzip, deflate",
+ "Referer": "http://192.168.101.2:8083/",
+ "Content-Type": "application/json;charset=UTF-8",
+ "tenant": "public",
+ "Origin": "http://192.168.101.2",
+ "Connection": "close",
+ "Priority": "u=0"
+ }
+ time_interval = get_time_interval(context)
+ data = {
+ "boundaryType": boundaryType,
+ "groupId": groupId,
+ "lowerBoundary": time_interval[0],
+ "upperBoundary": time_interval[1]
+ }
+ print("Request Body: ", data)
+ response = requests.post(url, params=params, headers=headers, json=data)
+ if response.status_code == 200:
+ print(response.json())
+ else:
+ print(response.text)
+ print("#########################")
+
+
+def get_time_interval(context):
+ execution_date = context.get('execution_date')
+ execution_date = execution_date.astimezone(target_timezone)
+ dag = context.get('dag')
+ schedule_interval = dag.schedule_interval
+ if isinstance(schedule_interval, timedelta):
+ return execution_date.timestamp(), (execution_date +
schedule_interval).timestamp()
+ else:
+ cron_expr = dag.schedule_interval
+ cron = croniter(cron_expr, execution_date)
+ next_run = cron.get_next(datetime)
+ return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {
+ 'owner': 'inlong',
+ 'start_date': start_date,
+ 'end_date': end_date,
+ 'catchup': False,
+}
+
+dag = DAG(
+ dag_id,
+ default_args=default_args,
+ schedule_interval=schedule_interval,
+ is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+ task_id=dag_id,
+ python_callable=taskFunction,
+ provide_context=True,
+ dag=dag,
+)
diff --git
a/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
new file mode 100644
index 0000000000..5666f9f471
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/resources/airflow/testGroup_normal.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.operators.python_operator import PythonOperator
+from datetime import datetime
+from croniter import croniter
+from airflow.hooks.base_hook import BaseHook
+import requests
+import pytz
+
+timezone = "Asia/Shanghai"
+start_offset_datetime_str = 1731072908243
+end_offset_datetime_str = 1731142800000
+schedule_interval = "*/1 * * * *" # Or put cron expression
+dag_id = "inlong_offline_task_testGroup_normal"
+groupId = "test_offline_1"
+connectionId = "inlong_connection"
+boundaryType = str("time")
+
+target_timezone = pytz.timezone(timezone)
+
+start_date = datetime.fromtimestamp(start_offset_datetime_str / 1000,
tz=target_timezone)
+end_date = datetime.fromtimestamp(end_offset_datetime_str / 1000,
tz=target_timezone)
+
+
+def taskFunction(**context):
+ print("#########################")
+ conn = BaseHook.get_connection(connectionId)
+ url = f"http://{conn.host}:{conn.port}/{conn.schema}"
+ params = {
+ "username": conn.login,
+ "password": conn.password
+ }
+ print("params", params)
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0)
Gecko/20100101 Firefox/131.0",
+ "Accept": "application/json",
+ "Accept-Language":
"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
+ "Accept-Encoding": "gzip, deflate",
+ "Content-Type": "application/json;charset=UTF-8",
+ "tenant": "public",
+ "Connection": "close",
+ "Priority": "u=0"
+ }
+ time_interval = get_time_interval(context)
+ data = {
+ "boundaryType": boundaryType,
+ "groupId": groupId,
+ "lowerBoundary": time_interval[0],
+ "upperBoundary": time_interval[1]
+ }
+ print("Request Body: ", data)
+ response = requests.post(url, params=params, headers=headers, json=data)
+ if response.status_code == 200:
+ print(response.json())
+ else:
+ print(response.text)
+ print("#########################")
+
+
+def get_time_interval(context):
+ execution_date = context.get('execution_date')
+ execution_date = execution_date.astimezone(target_timezone)
+ dag = context.get('dag')
+ schedule_interval = dag.schedule_interval
+ if isinstance(schedule_interval, timedelta):
+ return execution_date.timestamp(), (execution_date +
schedule_interval).timestamp()
+ else:
+ cron_expr = dag.schedule_interval
+ cron = croniter(cron_expr, execution_date)
+ next_run = cron.get_next(datetime)
+ return execution_date.timestamp(), next_run.timestamp()
+
+
+default_args = {
+ 'owner': 'inlong',
+ 'start_date': start_date,
+ 'end_date': end_date,
+ 'catchup': False,
+}
+
+dag = DAG(
+ dag_id,
+ default_args=default_args,
+ schedule_interval=schedule_interval,
+ is_paused_upon_creation=False
+)
+
+clean_task = PythonOperator(
+ task_id=dag_id,
+ python_callable=taskFunction,
+ provide_context=True,
+ dag=dag,
+)
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index b0193111aa..f17c451aee 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -113,3 +113,13 @@
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index a549e206f0..129bce9ac6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -104,3 +104,13 @@
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 6a71dc7a05..a062bc2c63 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -105,3 +105,13 @@
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
schedule.engine.dolphinscheduler.token=default_token_value
+
+# Airflow configuration
+schedule.engine.airflow.baseUrl=
+schedule.engine.airflow.username=
+schedule.engine.airflow.password=
+schedule.engine.airflow.connection.id=
+# Please confirm if it is a loopback address
+schedule.engine.airflow.inlong.manager.host=
+schedule.engine.airflow.cleaner.id=
+schedule.engine.airflow.creator.id=
\ No newline at end of file