This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 87830ee4d7 [INLONG-8405][Manager] Dynamically configure ClickHouse
source for Audit (#8424)
87830ee4d7 is described below
commit 87830ee4d7a03f249925efac13320f274d2d46db
Author: megru <[email protected]>
AuthorDate: Sat Jul 8 12:18:00 2023 +0800
[INLONG-8405][Manager] Dynamically configure ClickHouse source for Audit
(#8424)
* [INLONG-8405][Manager] Dynamically configure ClickHouse source for Audit
* [INLONG-8405][Manager] Optimize the DDL of audit_source table
* [INLONG-8405][Manager] Refactor the audit source related interface
---------
Co-authored-by: healchow <[email protected]>
---
.../manager/dao/entity/AuditSourceEntity.java | 46 +++++++++++
.../dao/mapper/AuditSourceEntityMapper.java | 37 +++++++++
.../resources/mappers/AuditSourceEntityMapper.xml | 71 +++++++++++++++++
.../manager/pojo/audit/AuditSourceRequest.java | 66 ++++++++++++++++
.../manager/pojo/audit/AuditSourceResponse.java | 73 +++++++++++++++++
.../inlong/manager/service/core/AuditService.java | 19 +++++
.../service/core/impl/AuditServiceImpl.java | 54 ++++++++++++-
.../service/resource/sink/ck/ClickHouseConfig.java | 91 ++++++++++++++--------
.../service/core/impl/AuditServiceTest.java | 29 ++++++-
.../main/resources/h2/apache_inlong_manager.sql | 23 ++++++
.../manager-web/sql/apache_inlong_manager.sql | 24 ++++++
inlong-manager/manager-web/sql/changes-1.8.0.sql | 23 ++++++
.../manager/web/controller/AuditController.java | 17 ++++
13 files changed, 537 insertions(+), 36 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
new file mode 100644
index 0000000000..752e790ce6
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AuditSourceEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * Audit source entity, including id, name, type, etc.
+ */
+@Data
+public class AuditSourceEntity implements Serializable {
+
+ private Integer id;
+ private String name;
+ private String type;
+ private String url;
+ private Integer enableAuth;
+ private String username;
+ private String token;
+ private Integer status;
+ private Integer isDeleted;
+ private String creator;
+ private String modifier;
+ private Date createTime;
+ private Date modifyTime;
+ private Integer version;
+
+}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
new file mode 100644
index 0000000000..d496f8570f
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AuditSourceEntityMapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
+
+/**
+ * Audit source mapper
+ */
+@Repository
+public interface AuditSourceEntityMapper {
+
+ int insert(AuditSourceEntity record);
+
+ AuditSourceEntity selectOnlineSource();
+
+ void offlineSourceByUrl(@Param("offlineUrl") String offlineUrl);
+
+}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
new file mode 100644
index 0000000000..da7d111782
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditSourceEntityMapper.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.inlong.manager.dao.entity.AuditSourceEntity">
+ <result column="id" jdbcType="INTEGER" property="id"/>
+ <result column="name" jdbcType="VARCHAR" property="name"/>
+ <result column="type" jdbcType="VARCHAR" property="type"/>
+ <result column="url" jdbcType="VARCHAR" property="url"/>
+ <result column="enable_auth" jdbcType="TINYINT" property="enableAuth"/>
+ <result column="username" jdbcType="VARCHAR" property="username"/>
+ <result column="token" jdbcType="VARCHAR" property="token"/>
+ <result column="status" jdbcType="SMALLINT" property="status"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
+ <result column="version" jdbcType="INTEGER" property="version"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, name, type, url, enable_auth, username, token, status, is_deleted,
+ creator, modifier, create_time, modify_time, version
+ </sql>
+
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+
parameterType="org.apache.inlong.manager.dao.entity.AuditSourceEntity">
+ insert into audit_source (id, name, type, url,
+ enable_auth, username, token,
+ status, creator, modifier)
+ values (#{id, jdbcType=INTEGER}, #{name, jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR}, #{url, jdbcType=VARCHAR},
+ #{enableAuth, jdbcType=TINYINT}, #{username,
jdbcType=VARCHAR}, #{token, jdbcType=VARCHAR},
+ #{status, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR},
#{modifier, jdbcType=VARCHAR})
+ </insert>
+
+ <select id="selectOnlineSource"
resultType="org.apache.inlong.manager.dao.entity.AuditSourceEntity"
+ resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"></include>
+ from audit_source
+ where status = 1
+ and is_deleted = 0
+ limit 1
+ </select>
+
+ <update id="offlineSourceByUrl">
+ update audit_source
+ set status = 0
+ where url = #{offlineUrl, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </update>
+
+</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
new file mode 100644
index 0000000000..63e0eedecd
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * Audit source request
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Audit source request")
+public class AuditSourceRequest {
+
+ @NotBlank
+ @ApiModelProperty(value = "Audit source name")
+ private String name;
+
+ @NotBlank
+ @ApiModelProperty(value = "Audit source type, including: MYSQL,
CLICKHOUSE, ELASTICSEARCH", required = true)
+ private String type;
+
+ @NotBlank
+ @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is
jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required
= true)
+ private String url;
+
+ @ApiModelProperty(value = "Offline the url if not null")
+ private String offlineUrl;
+
+ @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
+ private Integer enableAuth;
+
+ @ApiModelProperty(value = "Audit source username, needed if enableAuth is
1")
+ private String username;
+
+ @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
+ private String token;
+
+ @ApiModelProperty(value = "Version number")
+ private Integer version;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
new file mode 100644
index 0000000000..85125d4033
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditSourceResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.audit;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.Date;
+
+/**
+ * Audit source response
+ */
+@Data
+@ApiModel("Audit source response")
+public class AuditSourceResponse {
+
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Audit source name")
+ private String name;
+
+ @ApiModelProperty(value = "Audit source type, including: MYSQL,
CLICKHOUSE, ELASTICSEARCH", required = true)
+ private String type;
+
+ @ApiModelProperty(value = "Audit source URL, for MYSQL or CLICKHOUSE, is
jdbcUrl, and for ELASTICSEARCH is the access URL with hostname:port", required
= true)
+ private String url;
+
+ @ApiModelProperty(value = "Offline the url if not null")
+ private String offlineUrl;
+
+ @ApiModelProperty(value = "Enable auth or not, 0: disable, 1: enable")
+ private Integer enableAuth;
+
+ @ApiModelProperty(value = "Audit source username, needed if enableAuth is
1")
+ private String username;
+
+ @ApiModelProperty(value = "Audit source token, needed if enableAuth is 1")
+ private String token;
+
+ @ApiModelProperty(value = "Creator")
+ private String creator;
+
+ @ApiModelProperty(value = "Modifier")
+ private String modifier;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
+ @ApiModelProperty(value = "Version number")
+ private Integer version;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
index 1d2407a2c7..5068883b73 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java
@@ -18,6 +18,8 @@
package org.apache.inlong.manager.service.core;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import java.util.List;
@@ -51,4 +53,21 @@ public interface AuditService {
*/
Boolean refreshBaseItemCache();
+ /**
+ * Offline the old audit source through url, and insert and online a new
audit source.
+ * If the new url already exists in the table, the insert operation will
become an update operation.
+ *
+ * @param operator current operator
+ * @param request audit source request
+ * @return audit source id after updating or saving
+ */
+ Integer updateAuditSource(AuditSourceRequest request, String operator);
+
+ /**
+ * Get audit source that is online.
+ *
+ * @return audit source response.
+ */
+ AuditSourceResponse getAuditSource();
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 1e08785c6e..1552b13aa8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -17,21 +17,28 @@
package org.apache.inlong.manager.service.core.impl;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
+import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -61,7 +68,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
@@ -113,7 +119,7 @@ public class AuditServiceImpl implements AuditService {
private List<String> auditIdListForUser;
@Value("${audit.query.source}")
- private String auditQuerySource = AuditQuerySource.MYSQL.name();
+ private String auditQuerySource;
@Autowired
private AuditBaseEntityMapper auditBaseMapper;
@@ -125,6 +131,10 @@ public class AuditServiceImpl implements AuditService {
private StreamSinkEntityMapper sinkEntityMapper;
@Autowired
private StreamSourceEntityMapper sourceEntityMapper;
+ @Autowired
+ private ClickHouseConfig config;
+ @Autowired
+ private AuditSourceEntityMapper auditSourceMapper;
@PostConstruct
public void initialize() {
@@ -137,7 +147,6 @@ public class AuditServiceImpl implements AuditService {
}
@Override
- @Transactional(rollbackFor = Exception.class)
public Boolean refreshBaseItemCache() {
LOGGER.debug("start to reload audit base item info");
try {
@@ -159,6 +168,41 @@ public class AuditServiceImpl implements AuditService {
return true;
}
+ @Override
+ public Integer updateAuditSource(AuditSourceRequest request, String
operator) {
+ String offlineUrl = request.getOfflineUrl();
+ if (StringUtils.isNotBlank(offlineUrl)) {
+ auditSourceMapper.offlineSourceByUrl(offlineUrl);
+ LOGGER.info("success offline the audit source with url: {}",
offlineUrl);
+ }
+
+ // TODO firstly we should check to see if it exists, updated if it
exists, and created if it doesn't exist
+ AuditSourceEntity entity = CommonBeanUtils.copyProperties(request,
AuditSourceEntity::new);
+ entity.setStatus(InlongConstants.DEFAULT_ENABLE_VALUE);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ auditSourceMapper.insert(entity);
+ Integer id = entity.getId();
+ LOGGER.info("success to insert audit source with id={}", id);
+
+ // TODO we should select the config that needs to be updated according
to the source type
+ config.updateRuntimeConfig();
+ LOGGER.info("success to update audit source with id={}", id);
+
+ return id;
+ }
+
+ @Override
+ public AuditSourceResponse getAuditSource() {
+ AuditSourceEntity entity = auditSourceMapper.selectOnlineSource();
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND);
+ }
+
+ LOGGER.debug("success to get audit source, id={}", entity.getId());
+ return CommonBeanUtils.copyProperties(entity,
AuditSourceResponse::new);
+ }
+
@Override
public String getAuditId(String type, boolean isSent) {
if (StringUtils.isBlank(type)) {
@@ -249,9 +293,10 @@ public class AuditServiceImpl implements AuditService {
}
}
} else if (AuditQuerySource.CLICKHOUSE == querySource) {
- try (Connection connection =
ClickHouseConfig.getCkConnection();
+ try (Connection connection = config.getCkConnection();
PreparedStatement statement =
getAuditCkStatement(connection, groupId, streamId, auditId,
request.getStartDate(), request.getEndDate());
+
ResultSet resultSet = statement.executeQuery()) {
List<AuditInfo> auditSet = new ArrayList<>();
while (resultSet.next()) {
@@ -430,4 +475,5 @@ public class AuditServiceImpl implements AuditService {
}
return formatDateString;
}
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
index 8755fcaa39..72000ba421 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseConfig.java
@@ -17,59 +17,88 @@
package org.apache.inlong.manager.service.resource.sink.ck;
+import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
+import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
+
import com.alibaba.druid.pool.DruidDataSourceFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.util.Objects;
import java.util.Properties;
/**
- * Clickhouse config information, including url, user, etc.
+ * ClickHouse config, including url, user, etc.
*/
-@Component
+@Slf4j
+@Service
public class ClickHouseConfig {
+ @Autowired
+ private AuditSourceEntityMapper auditSourceMapper;
private static volatile DataSource source;
+ private static volatile String currentJdbcUrl = null;
+ private static volatile String currentUsername = null;
+ private static volatile String currentPassword = null;
- private static String jdbcUrl;
-
- private static String username;
-
- private static String password;
+ /**
+ * Update the runtime config of ClickHouse connection.
+ */
+ public synchronized void updateRuntimeConfig() {
+ try {
+ AuditSourceEntity auditSource =
auditSourceMapper.selectOnlineSource();
+ String jdbcUrl = auditSource.getUrl();
+ String username = auditSource.getUsername();
+ String password = StringUtils.isBlank(auditSource.getToken()) ? ""
: auditSource.getToken();
- @Value("${audit.ck.jdbcUrl}")
- public void setUrl(String jdbcUrl) {
- ClickHouseConfig.jdbcUrl = jdbcUrl;
- }
+ boolean changed = Objects.equals(currentJdbcUrl, jdbcUrl)
+ || Objects.equals(currentUsername, username)
+ || Objects.equals(currentPassword, password);
+ if (changed) {
+ currentJdbcUrl = jdbcUrl;
+ currentUsername = username;
+ currentPassword = password;
- @Value("${audit.ck.username}")
- public void setUsername(String username) {
- ClickHouseConfig.username = username;
- }
+ Properties pros = new Properties();
+ pros.put("url", jdbcUrl);
+ if (StringUtils.isNotBlank(username)) {
+ pros.put("username", username);
+ }
+ if (StringUtils.isNotBlank(password)) {
+ pros.put("password", password);
+ }
- @Value("${audit.ck.password}")
- public void setPassword(String password) {
- ClickHouseConfig.password = password;
+ source = DruidDataSourceFactory.createDataSource(pros);
+ log.info("success to create connection to {}", jdbcUrl);
+ }
+ } catch (Exception e) {
+ log.error("failed to read click house audit source: ", e);
+ }
}
/**
* Get ClickHouse connection from data source
*/
- public static Connection getCkConnection() throws Exception {
+ public Connection getCkConnection() throws Exception {
+ log.debug("start to get connection for CLICKHOUSE");
+ int retry = 0;
+ while (source == null && retry < 3) {
+ updateRuntimeConfig();
+ retry += 1;
+ }
+
if (source == null) {
- synchronized (ClickHouseConfig.class) {
- if (source == null) {
- Properties pros = new Properties();
- pros.put("url", jdbcUrl);
- pros.put("username", username);
- pros.put("password", password);
- source = DruidDataSourceFactory.createDataSource(pros);
- }
- }
+ log.warn("jdbc source is null for CLICKHOUSE");
+ return null;
}
- return source.getConnection();
+
+ Connection connection = source.getConnection();
+ log.info("success to get connection for CLICKHOUSE");
+ return connection;
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index 010a1bd38a..7909b0e012 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.AuditService;
@@ -41,7 +42,7 @@ class AuditServiceTest extends ServiceBaseTest {
private AuditService auditService;
@Test
- void testQueryFromMySQL() throws IOException {
+ void testQueryFromMySQL() {
AuditRequest request = new AuditRequest();
request.setAuditIds(Arrays.asList("3", "4"));
request.setInlongGroupId("g1");
@@ -75,4 +76,30 @@ class AuditServiceTest extends ServiceBaseTest {
request.setEndDate("2022-01-01");
Assertions.assertNotNull(auditService.listByCondition(request));
}
+
+ @Test
+ void testUpdateAuditSource() {
+ AuditSourceRequest request1 = AuditSourceRequest.builder()
+ .name("source_ch_1")
+ .type("CLICKHOUSE")
+ .url("jdbc:clickhouse://127.0.0.1:8123/db1")
+ .offlineUrl(null)
+ .enableAuth(0)
+ .build();
+ auditService.updateAuditSource(request1, GLOBAL_OPERATOR);
+
+ AuditSourceRequest request2 = AuditSourceRequest.builder()
+ .name("source_ch_2")
+ .type("CLICKHOUSE")
+ .url("jdbc:clickhouse://127.0.0.1:8123/db2")
+ .offlineUrl("jdbc:clickhouse://127.0.0.1:8123/db1")
+ .enableAuth(1)
+ .username("default")
+ .token("123456")
+ .build();
+ auditService.updateAuditSource(request2, GLOBAL_OPERATOR);
+
+ Assertions.assertEquals(auditService.getAuditSource().getUrl(),
request2.getUrl());
+ }
+
}
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index fe5f8d2d3d..a12c0dcba6 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -846,6 +846,29 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sort_postgres_input', 'POSTGRESQL', 0, '27'),
('audit_sort_postgres_output', 'POSTGRESQL', 1, '28');
+-- ----------------------------
+-- Table structure for audit_source
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `name` varchar(128) NOT NULL COMMENT 'Audit source name',
+ `type` varchar(20) NOT NULL COMMENT 'Audit source type, including:
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+ `url` varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with
hostname:port',
+ `enable_auth` tinyint(1) DEFAULT '1' COMMENT 'Enable auth or
not, 0: disable, 1: enable',
+ `username` varchar(128) COMMENT 'Audit source username, needed
if auth_enable is 1' ,
+ `token` varchar(512) DEFAULT NULL COMMENT 'Audit source
token, needed if auth_enable is 1',
+ `status` smallint(4) NOT NULL DEFAULT '1' COMMENT 'Whether the audit
source is online or offline, 0: offline, 1: online' ,
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) NOT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT
'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number,
which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+);
+
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 3301ed56d7..c38e413f30 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -893,6 +893,30 @@ VALUES ('audit_sdk_collect', 'SDK', 0, '1'),
('audit_sort_postgres_input', 'POSTGRESQL', 0, '27'),
('audit_sort_postgres_output', 'POSTGRESQL', 1, '28');
+-- ----------------------------
+-- Table structure for audit_source
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `name` varchar(128) NOT NULL COMMENT 'Audit source name',
+ `type` varchar(20) NOT NULL COMMENT 'Audit source type, including:
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+ `url` varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with
hostname:port',
+ `enable_auth` tinyint(1) DEFAULT '1' COMMENT 'Enable auth or
not, 0: disable, 1: enable',
+ `username` varchar(128) COMMENT 'Audit source username, needed
if auth_enable is 1' ,
+ `token` varchar(512) DEFAULT NULL COMMENT 'Audit source
token, needed if auth_enable is 1',
+ `status` smallint(4) NOT NULL DEFAULT '1' COMMENT 'Whether the audit
source is online or offline, 0: offline, 1: online' ,
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) NOT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT
'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number,
which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Audit source table';
+
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-1.8.0.sql
b/inlong-manager/manager-web/sql/changes-1.8.0.sql
index 51397ad8a5..edd0abb7ac 100644
--- a/inlong-manager/manager-web/sql/changes-1.8.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.8.0.sql
@@ -119,6 +119,7 @@ ALTER TABLE `workflow_process`
ALTER TABLE `workflow_task`
ADD `tenant` VARCHAR(256) DEFAULT 'public' NOT NULL comment 'Inlong tenant
of workflow task' after `display_name`;
+-- Alter heartbeat related tables
ALTER TABLE component_heartbeat DROP COLUMN id;
ALTER TABLE `component_heartbeat` ADD PRIMARY KEY (`component`, `instance`);
DROP INDEX unique_component_heartbeat on component_heartbeat;
@@ -130,3 +131,25 @@ DROP INDEX unique_group_heartbeat on group_heartbeat;
ALTER TABLE stream_heartbeat DROP COLUMN id;
ALTER TABLE `stream_heartbeat` ADD PRIMARY KEY (`component`, `instance`,
`inlong_group_id`, `inlong_stream_id`);
DROP INDEX unique_stream_heartbeat on stream_heartbeat;
+
+-- Create audit_source table
+CREATE TABLE IF NOT EXISTS `audit_source`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `name` varchar(128) NOT NULL COMMENT 'Audit source name',
+ `type` varchar(20) NOT NULL COMMENT 'Audit source type, including:
MYSQL, CLICKHOUSE, ELASTICSEARCH',
+ `url` varchar(256) NOT NULL COMMENT 'Audit source URL, for MYSQL
or CLICKHOUSE, is jdbcUrl, and for ELASTICSEARCH is the access URL with
hostname:port',
+ `enable_auth` tinyint(1) DEFAULT '1' COMMENT 'Enable auth or
not, 0: disable, 1: enable',
+ `username` varchar(128) COMMENT 'Audit source username, needed
if auth_enable is 1' ,
+ `token` varchar(512) DEFAULT NULL COMMENT 'Audit source
token, needed if auth_enable is 1',
+ `status` smallint(4) NOT NULL DEFAULT '1' COMMENT 'Whether the audit
source is online or offline, 0: offline, 1: online' ,
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) NOT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT
'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number,
which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_audit_source` (url, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Audit source table';
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
index 1e76796cf2..4addf6514a 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/AuditController.java
@@ -18,14 +18,18 @@
package org.apache.inlong.manager.web.controller;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
+import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.core.AuditService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -59,4 +63,17 @@ public class AuditController {
return Response.success(auditService.refreshBaseItemCache());
}
+ @ApiOperation(value = "Update the audit source")
+ @PostMapping(value = "/audit/updateSource")
+ public Response<Integer> updateAuditSource(@RequestBody AuditSourceRequest
request) {
+ return Response.success(auditService.updateAuditSource(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
+ @ApiOperation(value = "Get the audit source")
+ @GetMapping("/audit/getSource")
+ public Response<AuditSourceResponse> getAuditSource() {
+ // TODO support more parameters to query
+ return Response.success(auditService.getAuditSource());
+ }
+
}