This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 87830ee4d7 [INLONG-8405][Manager] Dynamically configure ClickHouse 
source for Audit (#8424)
87830ee4d7 is described below

commit 87830ee4d7a03f249925efac13320f274d2d46db
Author: megru <[email protected]>
AuthorDate: Sat Jul 8 12:18:00 2023 +0800

    [INLONG-8405][Manager] Dynamically configure ClickHouse source for Audit 
(#8424)
    
    * [INLONG-8405][Manager] Dynamically configure ClickHouse source for Audit
    
    * [INLONG-8405][Manager] Optimize the DDL of audit_source table
    
    * [INLONG-8405][Manager] Refactor the audit source related interface
    
    ---------
    
    Co-authored-by: healchow <[email protected]>
---
 .../manager/dao/entity/AuditSourceEntity.java      | 46 +++++++++++
 .../dao/mapper/AuditSourceEntityMapper.java        | 37 +++++++++
 .../resources/mappers/AuditSourceEntityMapper.xml  | 71 +++++++++++++++++
 .../manager/pojo/audit/AuditSourceRequest.java     | 66 ++++++++++++++++
 .../manager/pojo/audit/AuditSourceResponse.java    | 73 +++++++++++++++++
 .../inlong/manager/service/core/AuditService.java  | 19 +++++
 .../service/core/impl/AuditServiceImpl.java        | 54 ++++++++++++-
 .../service/resource/sink/ck/ClickHouseConfig.java | 91 ++++++++++++++--------
 .../service/core/impl/AuditServiceTest.java        | 29 ++++++-
 .../main/resources/h2/apache_inlong_manager.sql    | 23 ++++++
 .../manager-web/sql/apache_inlong_manager.sql      | 24 ++++++
 inlong-manager/manager-web/sql/changes-1.8.0.sql   | 23 ++++++
 .../manager/web/controller/AuditController.java    | 17 ++++
 13 files changed, 537 insertions(+), 36 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
new file mode 100644
index 0000000000..752e790ce6
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * Audit source entity, including id, name, type, etc.
+ */
+@Data
+public class AuditSourceEntity implements Serializable {
+
+    private Integer id;
+    private String name;
+    private String type;
+    private String url;
+    private Integer enableAuth;
+    private String username;
+    private String token;
+    private Integer status;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
new file mode 100644
index 0000000000..d496f8570f
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Audit source mapper
+ */
+@Repository
+public interface AuditSourceEntityMapper {
+
+    int insert(AuditSourceEntity record);
+
+    AuditSourceEntity selectOnlineSource();
+
+    void offlineSourceByUrl(@Param("offlineUrl") String offlineUrl);
+
+}
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
new file mode 100644
index 0000000000..da7d111782
--- /dev/null
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.inlong.manager.dao.entity.AuditSourceEntity">
+        <result column="id" jdbcType="INTEGER" property="id"/>
+        <result column="name" jdbcType="VARCHAR" property="name"/>
+        <result column="type" jdbcType="VARCHAR" property="type"/>
+        <result column="url" jdbcType="VARCHAR" property="url"/>
+        <result column="enable_auth" jdbcType="TINYINT" property="enableAuth"/>
+        <result column="username" jdbcType="VARCHAR" property="username"/>
+        <result column="token" jdbcType="VARCHAR" property="token"/>
+        <result column="status" jdbcType="SMALLINT" property="status"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, name, type, url, enable_auth, username, token, status, is_deleted,
+        creator, modifier, create_time, modify_time, version
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            
parameterType="org.apache.inlong.manager.dao.entity.AuditSourceEntity">
+        insert into audit_source (id, name, type, url,
+                                  enable_auth, username, token,
+                                  status, creator, modifier)
+        values (#{id, jdbcType=INTEGER}, #{name, jdbcType=VARCHAR}, 
#{type,jdbcType=VARCHAR}, #{url, jdbcType=VARCHAR},
+                #{enableAuth, jdbcType=TINYINT}, #{username, 
jdbcType=VARCHAR}, #{token, jdbcType=VARCHAR},
+                #{status, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, 
#{modifier, jdbcType=VARCHAR})
+    </insert>
+
+    <select id="selectOnlineSource" 
resultType="org.apache.inlong.manager.dao.entity.AuditSourceEntity"
+            resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"></include>
+        from audit_source
+        where status = 1
+        and is_deleted = 0
+        limit 1
+    </select>
+
+    <update id="offlineSourceByUrl">
+        update audit_source
+        set status = 0
+        where url = #{offlineUrl, jdbcType=VARCHAR}
+        and is_deleted = 0
+    </update>
+
+</mapper>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
new file mode 100644
index 0000000000..63e0eedecd
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * Audit source request
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Audit source request")
+public class AuditSourceRequest {
+
+    @NotBlank
+    @ApiModelProperty(value = "Audit source name")
+    private String name;
+
+    @NotBlank
+    @ApiModelProperty(value = "Audit source type, including: MYSQL, 
CLICKHOUSE, ELASTICSEARCH", required = true)
+    private String type;
+
+    @NotBlank
+    @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is 
jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required 
= true)
+    private String url;
+
+    @ApiModelProperty(value = "Offline the url if not null")
+    private String offlineUrl;
+
+    @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
+    private Integer enableAuth;
+
+    @ApiModelProperty(value = "Audit source username, needed if enableAuth is 
1")
+    private String username;
+
+    @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
+    private String token;
+
+    @ApiModelProperty(value = "Version number")
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
new file mode 100644
index 0000000000..85125d4033
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.Date;
+
+/**
+ * Audit source response
+ */
+@Data
+@ApiModel("Audit source response")
+public class AuditSourceResponse {
+
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Audit source name")
+    private String name;
+
+    @ApiModelProperty(value = "Audit source type, including: MYSQL, 
CLICKHOUSE, ELASTICSEARCH", required = true)
+    private String type;
+
+    @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is 
jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required 
= true)
+    private String url;
+
+    @ApiModelProperty(value = "Offline the url if not null")
+    private String offlineUrl;
+
+    @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
+    private Integer enableAuth;
+
+    @ApiModelProperty(value = "Audit source username, needed if enableAuth is 
1")
+    private String username;
+
+    @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
+    private String token;
+
+    @ApiModelProperty(value = "Creator")
+    private String creator;
+
+    @ApiModelProperty(value = "Modifier")
+    private String modifier;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
+    @ApiModelProperty(value = "Version number")
+    private Integer version;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index 1d2407a2c7..5068883b73 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.manager.service.core;
 
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
 import org.apache.inlong.manager.pojo.audit.AuditVO;
 
 import java.util.List;
@@ -51,4 +53,21 @@ public interface AuditService {
      */
     Boolean refreshBaseItemCache();
 
+    /**
+     * Offline the old audit source through url, and insert and online a new 
audit source.
+     * If the new url already exists in the table, the insert operation will 
become an update operation.
+     *
+     * @param operator current operator
+     * @param request audit source request
+     * @return audit source id after updating or saving
+     */
+    Integer updateAuditSource(AuditSourceRequest request, String operator);
+
+    /**
+     * Get audit source that is online.
+     *
+     * @return audit source response.
+     */
+    AuditSourceResponse getAuditSource();
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 1e08785c6e..1552b13aa8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -17,21 +17,28 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.AuditQuerySource;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.TimeStaticsDim;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
 import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
+import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.audit.AuditInfo;
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
 import org.apache.inlong.manager.pojo.audit.AuditVO;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -61,7 +68,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.PostConstruct;
 
@@ -113,7 +119,7 @@ public class AuditServiceImpl implements AuditService {
     private List<String> auditIdListForUser;
 
     @Value("${audit.query.source}")
-    private String auditQuerySource = AuditQuerySource.MYSQL.name();
+    private String auditQuerySource;
 
     @Autowired
     private AuditBaseEntityMapper auditBaseMapper;
@@ -125,6 +131,10 @@ public class AuditServiceImpl implements AuditService {
     private StreamSinkEntityMapper sinkEntityMapper;
     @Autowired
     private StreamSourceEntityMapper sourceEntityMapper;
+    @Autowired
+    private ClickHouseConfig config;
+    @Autowired
+    private AuditSourceEntityMapper auditSourceMapper;
 
     @PostConstruct
     public void initialize() {
@@ -137,7 +147,6 @@ public class AuditServiceImpl implements AuditService {
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
     public Boolean refreshBaseItemCache() {
         LOGGER.debug("start to reload audit base item info");
         try {
@@ -159,6 +168,41 @@ public class AuditServiceImpl implements AuditService {
         return true;
     }
 
+    @Override
+    public Integer updateAuditSource(AuditSourceRequest request, String 
operator) {
+        String offlineUrl = request.getOfflineUrl();
+        if (StringUtils.isNotBlank(offlineUrl)) {
+            auditSourceMapper.offlineSourceByUrl(offlineUrl);
+            LOGGER.info("success offline the audit source with url: {}", 
offlineUrl);
+        }
+
+        // TODO firstly we should check to see if it exists, updated if it 
exists, and created if it doesn't exist
+        AuditSourceEntity entity = CommonBeanUtils.copyProperties(request, 
AuditSourceEntity::new);
+        entity.setStatus(InlongConstants.DEFAULT_ENABLE_VALUE);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        auditSourceMapper.insert(entity);
+        Integer id = entity.getId();
+        LOGGER.info("success to insert audit source with id={}", id);
+
+        // TODO we should select the config that needs to be updated according 
to the source type
+        config.updateRuntimeConfig();
+        LOGGER.info("success to update audit source with id={}", id);
+
+        return id;
+    }
+
+    @Override
+    public AuditSourceResponse getAuditSource() {
+        AuditSourceEntity entity = auditSourceMapper.selectOnlineSource();
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND);
+        }
+
+        LOGGER.debug("success to get audit source, id={}", entity.getId());
+        return CommonBeanUtils.copyProperties(entity, 
AuditSourceResponse::new);
+    }
+
     @Override
     public String getAuditId(String type, boolean isSent) {
         if (StringUtils.isBlank(type)) {
@@ -249,9 +293,10 @@ public class AuditServiceImpl implements AuditService {
                     }
                 }
             } else if (AuditQuerySource.CLICKHOUSE == querySource) {
-                try (Connection connection = 
ClickHouseConfig.getCkConnection();
+                try (Connection connection = config.getCkConnection();
                         PreparedStatement statement = 
getAuditCkStatement(connection, groupId, streamId, auditId,
                                 request.getStartDate(), request.getEndDate());
+
                         ResultSet resultSet = statement.executeQuery()) {
                     List<AuditInfo> auditSet = new ArrayList<>();
                     while (resultSet.next()) {
@@ -430,4 +475,5 @@ public class AuditServiceImpl implements AuditService {
         }
         return formatDateString;
     }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
index 8755fcaa39..72000ba421 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
@@ -17,59 +17,88 @@
 
 package org.apache.inlong.manager.service.resource.sink.ck;
 
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
+
 import com.alibaba.druid.pool.DruidDataSourceFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import javax.sql.DataSource;
 
 import java.sql.Connection;
+import java.util.Objects;
 import java.util.Properties;
 
 /**
- * Clickhouse config information, including url, user, etc.
+ * ClickHouse config, including url, user, etc.
  */
-@Component
+@Slf4j
+@Service
 public class ClickHouseConfig {
 
+    @Autowired
+    private AuditSourceEntityMapper auditSourceMapper;
     private static volatile DataSource source;
+    private static volatile String currentJdbcUrl = null;
+    private static volatile String currentUsername = null;
+    private static volatile String currentPassword = null;
 
-    private static String jdbcUrl;
-
-    private static String username;
-
-    private static String password;
+    /**
+     * Update the runtime config of ClickHouse connection.
+     */
+    public synchronized void updateRuntimeConfig() {
+        try {
+            AuditSourceEntity auditSource = 
auditSourceMapper.selectOnlineSource();
+            String jdbcUrl = auditSource.getUrl();
+            String username = auditSource.getUsername();
+            String password = StringUtils.isBlank(auditSource.getToken()) ? "" 
: auditSource.getToken();
 
-    @Value("${audit.ck.jdbcUrl}")
-    public void setUrl(String jdbcUrl) {
-        ClickHouseConfig.jdbcUrl = jdbcUrl;
-    }
+            boolean changed = Objects.equals(currentJdbcUrl, jdbcUrl)
+                    || Objects.equals(currentUsername, username)
+                    || Objects.equals(currentPassword, password);
+            if (changed) {
+                currentJdbcUrl = jdbcUrl;
+                currentUsername = username;
+                currentPassword = password;
 
-    @Value("${audit.ck.username}")
-    public void setUsername(String username) {
-        ClickHouseConfig.username = username;
-    }
+                Properties pros = new Properties();
+                pros.put("url", jdbcUrl);
+                if (StringUtils.isNotBlank(username)) {
+                    pros.put("username", username);
+                }
+                if (StringUtils.isNotBlank(password)) {
+                    pros.put("password", password);
+                }
 
-    @Value("${audit.ck.password}")
-    public void setPassword(String password) {
-        ClickHouseConfig.password = password;
+                source = DruidDataSourceFactory.createDataSource(pros);
+                log.info("success to create connection to {}", jdbcUrl);
+            }
+        } catch (Exception e) {
+            log.error("failed to read click house audit source: ", e);
+        }
     }
 
     /**
      * Get ClickHouse connection from data source
      */
-    public static Connection getCkConnection() throws Exception {
+    public Connection getCkConnection() throws Exception {
+        log.debug("start to get connection for CLICKHOUSE");
+        int retry = 0;
+        while (source == null && retry < 3) {
+            updateRuntimeConfig();
+            retry += 1;
+        }
+
         if (source == null) {
-            synchronized (ClickHouseConfig.class) {
-                if (source == null) {
-                    Properties pros = new Properties();
-                    pros.put("url", jdbcUrl);
-                    pros.put("username", username);
-                    pros.put("password", password);
-                    source = DruidDataSourceFactory.createDataSource(pros);
-                }
-            }
+            log.warn("jdbc source is null for CLICKHOUSE");
+            return null;
         }
-        return source.getConnection();
+
+        Connection connection = source.getConnection();
+        log.info("success to get connection for CLICKHOUSE");
+        return connection;
     }
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index 010a1bd38a..7909b0e012 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.inlong.manager.pojo.audit.AuditInfo;
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
 import org.apache.inlong.manager.pojo.audit.AuditVO;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.AuditService;
@@ -41,7 +42,7 @@ class AuditServiceTest extends ServiceBaseTest {
     private AuditService auditService;
 
     @Test
-    void testQueryFromMySQL() throws IOException {
+    void testQueryFromMySQL() {
         AuditRequest request = new AuditRequest();
         request.setAuditIds(Arrays.asList("3", "4"));
         request.setInlongGroupId("g1");
@@ -75,4 +76,30 @@ class AuditServiceTest extends ServiceBaseTest {
         request.setEndDate("2022-01-01");
         Assertions.assertNotNull(auditService.listByCondition(request));
     }
+
+    @Test
+    void testUpdateAuditSource() {
+        AuditSourceRequest request1 = AuditSourceRequest.builder()
+                .name("source_ch_1")
+                .type("CLICKHOUSE")
+                .url("jdbc:clickhouse://127.0.0.1:8123/db1")
+                .offlineUrl(null)
+                .enableAuth(0)
+                .build();
+        auditService.updateAuditSource(request1, GLOBAL_OPERATOR);
+
+        AuditSourceRequest request2 = AuditSourceRequest.builder()
+                .name("source_ch_2")
+                .type("CLICKHOUSE")
+                .url("jdbc:clickhouse://127.0.0.1:8123/db2")
+                .offlineUrl("jdbc:clickhouse://127.0.0.1:8123/db1")
+                .enableAuth(1)
+                .username("default")
+                .token("123456")
+                .build();
+        auditService.updateAuditSource(request2, GLOBAL_OPERATOR);
+
+        Assertions.assertEquals(auditService.getAuditSource().getUrl(), 
request2.getUrl());
+    }
+
 }
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index fe5f8d2d3d..a12c0dcba6 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -846,6 +846,29 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
        ('audit_sort_postgres_input', 'POSTGRESQL', 0, '27'),
        ('audit_sort_postgres_output', 'POSTGRESQL', 1, '28');
 
+-- ----------------------------
+-- Table structure for audit_source
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+    `id`          int(11)      NOT NULL AUTO_INCREMENT,
+    `name`        varchar(128) NOT NULL COMMENT 'Audit source name',
+    `type`        varchar(20)  NOT NULL COMMENT 'Audit source type, including: 
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+    `url`         varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL 
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with 
hostname:port',
+    `enable_auth` tinyint(1)            DEFAULT '1' COMMENT 'Enable auth or 
not, 0: disable, 1: enable',
+    `username`    varchar(128)          COMMENT 'Audit source username, needed 
if auth_enable is 1' ,
+    `token`       varchar(512)          DEFAULT NULL COMMENT 'Audit source 
token, needed if auth_enable is 1',
+    `status`      smallint(4)  NOT NULL DEFAULT '1' COMMENT 'Whether the audit 
source is online or offline, 0: offline, 1: online' ,
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`    varchar(64)  NOT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`     int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, 
which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+);
+
 -- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 3301ed56d7..c38e413f30 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -893,6 +893,30 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
        ('audit_sort_postgres_input', 'POSTGRESQL', 0, '27'),
        ('audit_sort_postgres_output', 'POSTGRESQL', 1, '28');
 
+-- ----------------------------
+-- Table structure for audit_source
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+    `id`          int(11)      NOT NULL AUTO_INCREMENT,
+    `name`        varchar(128) NOT NULL COMMENT 'Audit source name',
+    `type`        varchar(20)  NOT NULL COMMENT 'Audit source type, including: 
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+    `url`         varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL 
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with 
hostname:port',
+    `enable_auth` tinyint(1)            DEFAULT '1' COMMENT 'Enable auth or 
not, 0: disable, 1: enable',
+    `username`    varchar(128)          COMMENT 'Audit source username, needed 
if auth_enable is 1' ,
+    `token`       varchar(512)          DEFAULT NULL COMMENT 'Audit source 
token, needed if auth_enable is 1',
+    `status`      smallint(4)  NOT NULL DEFAULT '1' COMMENT 'Whether the audit 
source is online or offline, 0: offline, 1: online' ,
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`    varchar(64)  NOT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`     int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, 
which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Audit source table';
+
 -- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-1.8.0.sql 
b/inlong-manager/manager-web/sql/changes-1.8.0.sql
index 51397ad8a5..edd0abb7ac 100644
--- a/inlong-manager/manager-web/sql/changes-1.8.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.8.0.sql
@@ -119,6 +119,7 @@ ALTER TABLE `workflow_process`
 ALTER TABLE `workflow_task`
     ADD `tenant` VARCHAR(256) DEFAULT 'public' NOT NULL comment 'Inlong tenant 
of workflow task' after `display_name`;
 
+-- Alter heartbeat related tables
 ALTER TABLE component_heartbeat DROP COLUMN id;
 ALTER TABLE `component_heartbeat` ADD PRIMARY KEY (`component`, `instance`);
 DROP INDEX unique_component_heartbeat on component_heartbeat;
@@ -130,3 +131,25 @@ DROP INDEX unique_group_heartbeat on group_heartbeat;
 ALTER TABLE stream_heartbeat DROP COLUMN id;
 ALTER TABLE `stream_heartbeat` ADD PRIMARY KEY (`component`, `instance`, 
`inlong_group_id`, `inlong_stream_id`);
 DROP INDEX unique_stream_heartbeat on stream_heartbeat;
+
+-- Create audit_source table
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+    `id`          int(11)      NOT NULL AUTO_INCREMENT,
+    `name`        varchar(128) NOT NULL COMMENT 'Audit source name',
+    `type`        varchar(20)  NOT NULL COMMENT 'Audit source type, including: 
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+    `url`         varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL 
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with 
hostname:port',
+    `enable_auth` tinyint(1)            DEFAULT '1' COMMENT 'Enable auth or 
not, 0: disable, 1: enable',
+    `username`    varchar(128)          COMMENT 'Audit source username, needed 
if auth_enable is 1' ,
+    `token`       varchar(512)          DEFAULT NULL COMMENT 'Audit source 
token, needed if auth_enable is 1',
+    `status`      smallint(4)  NOT NULL DEFAULT '1' COMMENT 'Whether the audit 
source is online or offline, 0: offline, 1: online' ,
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to 
delete, 0: not deleted, > 0: deleted',
+    `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`    varchar(64)  NOT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`     int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, 
which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Audit source table';
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
index 1e76796cf2..4addf6514a 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
@@ -18,14 +18,18 @@
 package org.apache.inlong.manager.web.controller;
 
 import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
 import org.apache.inlong.manager.pojo.audit.AuditVO;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.service.core.AuditService;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -59,4 +63,17 @@ public class AuditController {
         return Response.success(auditService.refreshBaseItemCache());
     }
 
+    @ApiOperation(value = "Update the audit source")
+    @PostMapping(value = "/audit/updateSource")
+    public Response<Integer> updateAuditSource(@RequestBody AuditSourceRequest 
request) {
+        return Response.success(auditService.updateAuditSource(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
+    @ApiOperation(value = "Get the audit source")
+    @GetMapping("/audit/getSource")
+    public Response<AuditSourceResponse> getAuditSource() {
+        // TODO support more parameters to query
+        return Response.success(auditService.getAuditSource());
+    }
+
 }


Reply via email to