This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 91a0cb3aa4 [INLONG-11508][Manager] Add APIs to dirty data query
(#11509)
91a0cb3aa4 is described below
commit 91a0cb3aa42fc855f3bd8c91cf0dc12ab54c5e02
Author: fuweng11 <[email protected]>
AuthorDate: Tue Nov 19 18:32:25 2024 +0800
[INLONG-11508][Manager] Add APIs to dirty data query (#11509)
* [INLONG-11508][Manager] Add APIs to dirty data query
---
.../manager/dao/entity/DirtyQueryLogEntity.java} | 27 ++-
.../dao/mapper/DirtyQueryLogEntityMapper.java} | 25 ++-
.../mappers/DirtyQueryLogEntityMapper.xml | 93 +++++++++++
.../inlong/manager/pojo/sink/BaseStreamSink.java | 17 ++
...treamSink.java => DirtyDataDetailResponse.java} | 46 +++--
.../{BaseStreamSink.java => DirtyDataRequest.java} | 38 +++--
.../manager/pojo/sink/DirtyDataResponse.java} | 21 ++-
.../pojo/sink/DirtyDataTrendDetailResponse.java} | 24 ++-
...eStreamSink.java => DirtyDataTrendRequest.java} | 35 ++--
.../inlong/manager/pojo/sink/SinkRequest.java | 3 +
.../inlong/manager/pojo/sink/StreamSink.java | 3 +
.../service/dirtyData/DirtyQueryLogService.java} | 36 ++--
.../dirtyData/impl/DirtyQueryLogServiceImpl.java | 185 +++++++++++++++++++++
.../main/resources/h2/apache_inlong_manager.sql | 18 ++
.../manager-web/sql/apache_inlong_manager.sql | 19 +++
inlong-manager/manager-web/sql/changes-2.1.0.sql | 16 ++
.../web/controller/StreamSinkController.java | 38 +++++
.../src/main/resources/application-dev.properties | 7 +-
.../src/main/resources/application-prod.properties | 7 +-
.../src/main/resources/application-test.properties | 7 +-
20 files changed, 577 insertions(+), 88 deletions(-)
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
similarity index 60%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
index 0d4c984778..3554284f34 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DirtyQueryLogEntity.java
@@ -15,13 +15,26 @@
* limitations under the License.
*/
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.dao.entity;
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import lombok.Data;
-USE `apache_inlong_manager`;
+import java.io.Serializable;
+import java.util.Date;
-ALTER TABLE `schedule_config`
- ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+@Data
+public class DirtyQueryLogEntity implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Integer id;
+ private String md5;
+ private String requestParams;
+ private String taskId;
+ private Integer isDeleted;
+ private String creator;
+ private String modifier;
+ private Date createTime;
+ private Date modifyTime;
+ private Integer version;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
similarity index 59%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
index 0d4c984778..08db57441a 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DirtyQueryLogEntityMapper.java
@@ -15,13 +15,24 @@
* limitations under the License.
*/
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.dao.mapper;
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity;
-USE `apache_inlong_manager`;
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
-ALTER TABLE `schedule_config`
- ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+@Repository
+public interface DirtyQueryLogEntityMapper {
+
+ int updateByIdSelective(DirtyQueryLogEntity record);
+
+ int insert(DirtyQueryLogEntity record);
+
+ DirtyQueryLogEntity selectByPrimaryKey(Integer id);
+
+ DirtyQueryLogEntity selectByMd5(String md5);
+
+ void updateToTimeout(@Param("beforeMinutes") Integer beforeMinutes);
+
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
new file mode 100644
index 0000000000..0fb1ce8e5f
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/DirtyQueryLogEntityMapper.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="md5" jdbcType="VARCHAR" property="md5" />
+ <result column="request_params" jdbcType="VARCHAR"
property="requestParams" />
+ <result column="task_id" jdbcType="VARCHAR" property="taskId" />
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+ <result column="creator" jdbcType="VARCHAR" property="creator" />
+ <result column="modifier" jdbcType="VARCHAR" property="modifier" />
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime" />
+ <result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime" />
+ <result column="version" jdbcType="INTEGER" property="version" />
+ </resultMap>
+
+ <sql id="Base_Column_List">
+ id, md5, request_params, task_id, is_deleted, creator, modifier,
create_time, modify_time, version
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from dirty_query_log
+ where is_deleted = 0
+ and id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByMd5" parameterType="java.lang.String"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from dirty_query_log
+ where is_deleted = 0
+ and md5 = #{md5,jdbcType=VARCHAR}
+ limit 1
+ </select>
+ <insert id="insert"
parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+ insert into dirty_query_log (id, md5, request_params,
+ task_id, creator, modifier)
+ values (#{id,jdbcType=INTEGER}, #{md5,jdbcType=VARCHAR},
#{requestParams,jdbcType=VARCHAR},
+ #{taskId,jdbcType=VARCHAR}, #{creator,jdbcType=VARCHAR},
#{modifier,jdbcType=VARCHAR})
+ </insert>
+ <update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity">
+ update dirty_query_log
+ <set>
+ <if test="md5 != null">
+ md5 = #{md5,jdbcType=VARCHAR},
+ </if>
+ <if test="requestParams != null">
+ request_params = #{requestParams,jdbcType=VARCHAR},
+ </if>
+ <if test="taskId != null">
+ task_id = #{taskId,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ version = #{version,jdbcType=INTEGER} + 1
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ and version = #{version,jdbcType=INTEGER}
+ </update>
+ <update id="updateToTimeout">
+ update dirty_query_log
+ <set>
+ is_deleted = id
+ </set>
+ <where>
+ is_deleted = 0
+ and create_time <= DATE_ADD(NOW(), INTERVAL -#{beforeMinutes,
jdbcType=INTEGER} MINUTE)
+ </where>
+ </update>
+</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
index d8df7f4a28..6ad6c9b523 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
@@ -17,12 +17,18 @@
package org.apache.inlong.manager.pojo.sink;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotNull;
+
/**
* The base parameter class of StreamSink, support user extend their own
business params.
*/
@@ -32,6 +38,9 @@ import lombok.NoArgsConstructor;
@ApiModel("Base info of stream sink")
public class BaseStreamSink {
+ @ApiModelProperty("Enable data archiving")
+ private Boolean enableDataArchiving;
+
@ApiModelProperty("Transform sql")
private String transformSql;
@@ -40,4 +49,12 @@ public class BaseStreamSink {
@ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
private String stopConsumeTime;
+
+ public static BaseStreamSink getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, BaseStreamSink.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
similarity index 50%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
index d8df7f4a28..2389217a40 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataDetailResponse.java
@@ -19,25 +19,45 @@ package org.apache.inlong.manager.pojo.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Dirty data detail info.
*/
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@ApiModel("Dirty data detail info")
+public class DirtyDataDetailResponse {
- @ApiModelProperty("Transform sql")
- private String transformSql;
+ @ApiModelProperty(value = "Tdbank imp date")
+ private String tdbankImpDate;
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ @ApiModelProperty(value = "Data flow id")
+ private String dataFlowId;
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
+ @ApiModelProperty(value = "Inlong group id")
+ private String groupId;
+
+ @ApiModelProperty(value = "Inlong stream id")
+ private String streamId;
+
+ @ApiModelProperty(value = "Report time")
+ private String reportTime;
+
+ @ApiModelProperty(value = "Data time")
+ private String dataTime;
+
+ @ApiModelProperty(value = "Server type")
+ private String serverType;
+
+ @ApiModelProperty(value = "Dirty type")
+ private String dirtyType;
+
+ @ApiModelProperty(value = "Dirty message")
+ private String dirtyMessage;
+
+ @ApiModelProperty(value = "Ext info")
+ private String extInfo;
+
+ @ApiModelProperty(value = "Dirty data")
+ private String dirtyData;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
similarity index 56%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
index d8df7f4a28..8b3fc32a6f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataRequest.java
@@ -19,25 +19,37 @@ package org.apache.inlong.manager.pojo.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Query request for Dirty data
*/
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Query request for Dirty data")
+public class DirtyDataRequest {
+
+ @ApiModelProperty(value = "Sink id list")
+ private List<Integer> sinkIdList;
+
+ @ApiModelProperty(value = "Key word")
+ private String keyword;
+
+ @ApiModelProperty(value = "Server type")
+ private String serverType;
+
+ @ApiModelProperty(value = "Dirty type")
+ private String dirtyType;
- @ApiModelProperty("Transform sql")
- private String transformSql;
+ @ApiModelProperty(value = "Start time")
+ private String startTime;
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ @ApiModelProperty(value = "End time")
+ private String endTime;
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
+ @ApiModelProperty(value = "Data count")
+ private Integer dataCount = 10;
}
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
similarity index 64%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
index 0d4c984778..08203db09a 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataResponse.java
@@ -15,13 +15,20 @@
* limitations under the License.
*/
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.sink;
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-USE `apache_inlong_manager`;
+/**
+ * Dirty data info.
+ */
+@Data
+@ApiModel("Dirty data info")
+public class DirtyDataResponse {
+
+ @ApiModelProperty(value = "Task id")
+ private String taskId;
-ALTER TABLE `schedule_config`
- ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+}
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
similarity index 64%
copy from inlong-manager/manager-web/sql/changes-2.1.0.sql
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
index 0d4c984778..cc56cd93ee 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendDetailResponse.java
@@ -15,13 +15,23 @@
* limitations under the License.
*/
--- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
--- When upgrading to version 1.5.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.sink;
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-USE `apache_inlong_manager`;
+/**
+ * Dirty data detail info.
+ */
+@Data
+@ApiModel("Dirty data trend detail info")
+public class DirtyDataTrendDetailResponse {
+
+ @ApiModelProperty(value = "Report time")
+ private String reportTime;
+
+ @ApiModelProperty(value = "Data count")
+ private String count;
-ALTER TABLE `schedule_config`
- ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
similarity index 59%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
index d8df7f4a28..b0e52420a2 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/DirtyDataTrendRequest.java
@@ -19,25 +19,34 @@ package org.apache.inlong.manager.pojo.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Query request for Dirty data
*/
@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Query request for Dirty data")
+public class DirtyDataTrendRequest {
+
+ @ApiModelProperty(value = "Sink id list")
+ private List<Integer> sinkIdList;
+
+ @ApiModelProperty(value = "Data time unit")
+ private String dataTimeUnit;
+
+ @ApiModelProperty(value = "Server type")
+ private String serverType;
- @ApiModelProperty("Transform sql")
- private String transformSql;
+ @ApiModelProperty(value = "Dirty type")
+ private String dirtyType;
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ @ApiModelProperty(value = "Start time")
+ private String startTime;
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
+ @ApiModelProperty(value = "End time")
+ private String endTime;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index 24c544b943..782817a50d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -100,6 +100,9 @@ public abstract class SinkRequest {
@Range(min = 0, max = 1, message = "default is 1, only supports [0:
disable, 1: enable]")
private Integer enableCreateResource = 1;
+ @ApiModelProperty("Enable data archiving")
+ private Boolean enableDataArchiving;
+
@ApiModelProperty(value = "Whether to start the process after saving or
updating. Default is false")
private Boolean startProcess = false;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
index 85fd72a1a4..79a5af3876 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
@@ -90,6 +90,9 @@ public abstract class StreamSink extends StreamNode {
@ApiModelProperty(value = "Whether to enable create sink resource? 0:
disable, 1: enable. Default is 1", notes = "Such as enable or disable to create
Hive table")
private Integer enableCreateResource = 1;
+ @ApiModelProperty("Enable data archiving")
+ private Boolean enableDataArchiving;
+
@ApiModelProperty("Backend operation log")
private String operateLog;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
similarity index 50%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
copy to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
index d8df7f4a28..3629ee985c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/DirtyQueryLogService.java
@@ -15,29 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sink;
+package org.apache.inlong.manager.service.dirtyData;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
+
+import java.util.List;
/**
- * The base parameter class of StreamSink, support user extend their own
business params.
+ * Dirty query log service
*/
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@ApiModel("Base info of stream sink")
-public class BaseStreamSink {
+public interface DirtyQueryLogService {
+
+ DirtyDataResponse listDirtyData(DirtyDataRequest request);
+
+ DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request);
+
+ List<DirtyDataDetailResponse> getDirtyData(String taskId);
- @ApiModelProperty("Transform sql")
- private String transformSql;
+ List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId);
- @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format")
- private String startConsumeTime;
+ String getSqlTaskStatus(String taskId);
- @ApiModelProperty("Stop consume time, yyyy-MM-dd HH:mm:ss format")
- private String stopConsumeTime;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
new file mode 100644
index 0000000000..a63a320765
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/dirtyData/impl/DirtyQueryLogServiceImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.dirtyData.impl;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.dao.entity.DirtyQueryLogEntity;
+import org.apache.inlong.manager.dao.mapper.DirtyQueryLogEntityMapper;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+@Service
+@Slf4j
+public class DirtyQueryLogServiceImpl implements DirtyQueryLogService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DirtyQueryLogServiceImpl.class);
+
+ @Autowired
+ private DirtyQueryLogEntityMapper dirtyQueryLogEntityMapper;
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Value("${dirty.log.clean.enabled:false}")
+ private Boolean dirtyLogCleanEnabled;
+ @Value("${dirty.log.clean.interval.minutes:5}")
+ private Integer dirtyLogCleanInterval;
+ @Value("${dirty.dirty.retention.minutes:10}")
+ private Integer retentionMinutes;
+ @Value("${dirty.dirty.db.table:inlong_iceberg::dirty_data_achive_iceberg}")
+ private String dirtyDataDbTable;
+
+ @PostConstruct
+ private void startDirtyLogCleanTask() {
+ if (dirtyLogCleanEnabled) {
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("scheduled-dirtyQueryLog-deleted-%d")
+ .setDaemon(true)
+ .build();
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
+ executor.scheduleWithFixedDelay(() -> {
+ try {
+ LOGGER.info("begin to clean dirty query log");
+
dirtyQueryLogEntityMapper.updateToTimeout(retentionMinutes);
+ LOGGER.info("success to clean dirty query log
successfully");
+ } catch (Throwable t) {
+ LOGGER.error("clean dirty query log error", t);
+ }
+ }, 0, dirtyLogCleanInterval, TimeUnit.MINUTES);
+ LOGGER.info("clean dirty query log started successfully");
+ }
+ }
+
+ @Override
+ public DirtyDataResponse listDirtyDataTrend(DirtyDataTrendRequest request)
{
+ if (CollectionUtils.isEmpty(request.getSinkIdList())) {
+ return null;
+ }
+ try {
+ DirtyDataResponse dirtyDataResponse = new DirtyDataResponse();
+ String requestStr = objectMapper.writeValueAsString(request);
+ String md5 = DigestUtils.md5Hex(requestStr);
+ DirtyQueryLogEntity dirtyQueryLogEntity =
dirtyQueryLogEntityMapper.selectByMd5(md5);
+ if (dirtyQueryLogEntity != null) {
+ LOGGER.info("dirty query log is exist");
+ dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId());
+ return dirtyDataResponse;
+ }
+ DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity();
+ // TODO dirtyQueryLog.setTaskId();
+ dirtyQueryLog.setMd5(md5);
+ dirtyQueryLog.setRequestParams(requestStr);
+ dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName());
+ dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName());
+ dirtyQueryLogEntityMapper.insert(dirtyQueryLog);
+
+ return dirtyDataResponse;
+ } catch (Exception e) {
+ throw new BusinessException("list dirty data trend failed");
+ }
+ }
+
+ @Override
+ public DirtyDataResponse listDirtyData(DirtyDataRequest request) {
+ if (CollectionUtils.isEmpty(request.getSinkIdList())) {
+ return null;
+ }
+ if (request.getDataCount() == null) {
+ request.setDataCount(10);
+ }
+ try {
+ DirtyDataResponse dirtyDataResponse = new DirtyDataResponse();
+ String requestStr = objectMapper.writeValueAsString(request);
+ String md5 = DigestUtils.md5Hex(requestStr);
+ DirtyQueryLogEntity dirtyQueryLogEntity =
dirtyQueryLogEntityMapper.selectByMd5(md5);
+ if (dirtyQueryLogEntity != null) {
+ LOGGER.info("dirty query log is exist");
+ dirtyDataResponse.setTaskId(dirtyQueryLogEntity.getTaskId());
+ return dirtyDataResponse;
+ }
+ DirtyQueryLogEntity dirtyQueryLog = new DirtyQueryLogEntity();
+ // TODO dirtyQueryLog.setTaskId();
+ dirtyQueryLog.setMd5(md5);
+ dirtyQueryLog.setRequestParams(requestStr);
+ dirtyQueryLog.setCreator(LoginUserUtils.getLoginUser().getName());
+ dirtyQueryLog.setModifier(LoginUserUtils.getLoginUser().getName());
+ dirtyQueryLogEntityMapper.insert(dirtyQueryLog);
+ return dirtyDataResponse;
+ } catch (Exception e) {
+ LOGGER.error("list dirty data failed", e);
+ throw new BusinessException("list dirty data failed");
+ }
+ }
+
+ @Override
+ public List<DirtyDataDetailResponse> getDirtyData(String taskId) {
+ try {
+ // TODO
+ return new ArrayList<>();
+ } catch (Exception e) {
+ LOGGER.error("get dirty data failed", e);
+ throw new BusinessException("get dirty data failed");
+ }
+ }
+
+ @Override
+ public List<DirtyDataTrendDetailResponse> getDirtyDataTrend(String taskId)
{
+ try {
+ // TODO
+ return new ArrayList<>();
+ } catch (Exception e) {
+ LOGGER.error("get dirty data trend failed", e);
+ throw new BusinessException("get dirty data trend failed");
+ }
+ }
+
+ @Override
+ public String getSqlTaskStatus(String taskId) {
+ try {
+ // TODO
+ return "success";
+ } catch (Exception e) {
+ LOGGER.error("get sql task status failed", e);
+ throw new BusinessException("get get sql task status failed");
+ }
+ }
+}
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 1d92f14f46..6836d44d5b 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -1013,6 +1013,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+
+-- ----------------------------
+-- Table structure for dirty_query_log
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params',
+ `request_params` mediumtext DEFAULT NULL COMMENT 'Request params,
will be saved as JSON string',
+ `task_id` varchar(256) DEFAULT '' COMMENT 'Task id',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`)
+);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 58259c9047..51a72d6039 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1066,6 +1066,25 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+
+-- ----------------------------
+-- Table structure for dirty_query_log
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params',
+ `request_params` mediumtext DEFAULT NULL COMMENT 'Request params,
will be saved as JSON string',
+ `task_id` varchar(256) DEFAULT '' COMMENT 'Task id',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table';
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-web/sql/changes-2.1.0.sql
index 0d4c984778..2fa48f95b4 100644
--- a/inlong-manager/manager-web/sql/changes-2.1.0.sql
+++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql
@@ -25,3 +25,19 @@ USE `apache_inlong_manager`;
ALTER TABLE `schedule_config`
ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';
+
+CREATE TABLE IF NOT EXISTS `dirty_query_log`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `md5` varchar(256) NOT NULL COMMENT 'Md5 for request params',
+ `request_params` mediumtext DEFAULT NULL COMMENT 'Request params,
will be saved as JSON string',
+ `task_id` varchar(256) DEFAULT '' COMMENT 'Task id',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete,
0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Dirty query log table';
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index b7b8dbd5d4..331e3a5355 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -25,12 +25,18 @@ import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
+import org.apache.inlong.manager.pojo.sink.DirtyDataDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataRequest;
+import org.apache.inlong.manager.pojo.sink.DirtyDataResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendDetailResponse;
+import org.apache.inlong.manager.pojo.sink.DirtyDataTrendRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.dirtyData.DirtyQueryLogService;
import org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -60,6 +66,8 @@ public class StreamSinkController {
@Autowired
private StreamSinkService sinkService;
+ @Autowired
+ private DirtyQueryLogService dirtyQueryLogService;
@RequestMapping(value = "/sink/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SINK)
@@ -143,4 +151,34 @@ public class StreamSinkController {
return Response.success(sinkService.parseFields(parseFieldRequest));
}
+ @RequestMapping(value = "/sink/listDirtyData", method = RequestMethod.POST)
+ @ApiOperation(value = "List stream sinks by paginating")
+ public Response<DirtyDataResponse> listDirtyData(@RequestBody
DirtyDataRequest request) {
+ return Response.success(dirtyQueryLogService.listDirtyData(request));
+ }
+
+ @RequestMapping(value = "/sink/listDirtyDataTrend", method =
RequestMethod.POST)
+ @ApiOperation(value = "List stream sinks by paginating")
+ public Response<DirtyDataResponse> listDirtyDataTrend(@RequestBody
DirtyDataTrendRequest request) {
+ return
Response.success(dirtyQueryLogService.listDirtyDataTrend(request));
+ }
+
+ @RequestMapping(value = "/sink/getDirtyData/{taskId}", method =
RequestMethod.GET)
+ @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required
= true)
+ public Response<List<DirtyDataDetailResponse>> getDirtyData(@PathVariable
String taskId) {
+ return Response.success(dirtyQueryLogService.getDirtyData(taskId));
+ }
+
+ @RequestMapping(value = "/sink/getDirtyDataTrend/{taskId}", method =
RequestMethod.GET)
+ @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required
= true)
+ public Response<List<DirtyDataTrendDetailResponse>>
getDirtyDataTrend(@PathVariable String taskId) {
+ return
Response.success(dirtyQueryLogService.getDirtyDataTrend(taskId));
+ }
+
+ @RequestMapping(value = "/sink/SqlTaskStatus/{taskId}", method =
RequestMethod.GET)
+ @ApiImplicitParam(name = "taskId", dataTypeClass = String.class, required
= true)
+ public Response<String> SqlTaskStatus(@PathVariable String taskId) {
+ return Response.success(dirtyQueryLogService.getSqlTaskStatus(taskId));
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 794f201bda..2bad5f801f 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -105,4 +105,9 @@ agent.install.temp.path=inlong/agent-installer-temp/
default.module.id=1
# schedule engine type
# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 3e8f329470..040c868bcf 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -97,4 +97,9 @@ cls.manager.endpoint=127.0.0.1
# schedule engine type
# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 5ff929c2b8..393eef6b05 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -98,4 +98,9 @@ cls.manager.endpoint=127.0.0.1
# schedule engine type
# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+inlong.schedule.engine=none
+
+dirty.log.clean.enabled=false
+dirty.log.clean.interval.minutes=5
+dirty.dirty.retention.minutes=10
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file