This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 33c64e8f8 [catalog] Introduce catalog managed (#3936)
33c64e8f8 is described below
commit 33c64e8f85261a699bc8e5e73bdad42784345897
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Aug 7 15:35:42 2024 +0800
[catalog] Introduce catalog managed (#3936)
* [catalog] Introduce catalog managed
* [feature] Add the catalog type parameter specific definition.
* spotless
* revert vcs.xml
* fixed sonar cloud
* [impove] fixed sonar cloud issue
* [feature][catalog] add test from check catalogtype To db
---
.../streampark/common/enums/CatalogType.java | 26 ++++
.../streampark/console/base/util/JacksonUtils.java | 11 ++
.../console/core/bean/FlinkCatalogParams.java | 152 +++++++++++++++++++++
.../console/core/controller/CatalogController.java | 81 +++++++++++
.../console/core/entity/FlinkCatalog.java | 99 ++++++++++++++
.../console/core/mapper/CatalogMapper.java | 34 +++++
.../console/core/service/CatalogService.java | 58 ++++++++
.../core/service/impl/CatalogServiceImpl.java | 123 +++++++++++++++++
.../src/main/resources/db/schema-h2.sql | 15 ++
.../main/resources/mapper/core/CatalogMapper.xml | 53 +++++++
.../console/core/service/CatalogServiceTest.java | 119 ++++++++++++++++
11 files changed, 771 insertions(+)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
new file mode 100644
index 000000000..fb2caab55
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+/** catalog type */
+public enum CatalogType {
+ JDBC,
+ HIVE,
+ PAIMON,
+ CUSTOM
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
index c712c6edd..ee136c1b1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
@@ -22,10 +22,12 @@ import org.apache.streampark.common.util.DateUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.module.scala.DefaultScalaModule;
+import java.io.IOException;
import java.text.SimpleDateFormat;
/** Serialization utils */
@@ -56,4 +58,13 @@ public final class JacksonUtils {
public static String write(Object object) throws JsonProcessingException {
return MAPPER.writeValueAsString(object);
}
+
+ public static boolean isValidJson(String jsonStr) {
+ try {
+ JsonNode jsonNode = MAPPER.readTree(jsonStr);
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
new file mode 100644
index 000000000..46a8210d8
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.springframework.beans.BeanUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+import java.util.Date;
+
+@Data
+@Slf4j
+public class FlinkCatalogParams implements Serializable {
+
+ private Long id;
+
+ private Long teamId;
+
+ private String catalogName;
+
+ private CatalogType catalogType;
+
+ /** creator */
+ private Long userId;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+ private FlinkJDBCCatalog flinkJDBCCatalog;
+
+ private FlinkHiveCatalog flinkHiveCatalog;
+
+ private FlinkPaimonCatalog flinkPaimonCatalog;
+
+ private String customCatalogConfig;
+
+ public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) {
+ if (flinkCatalog == null) {
+ return null;
+ }
+ FlinkCatalogParams flinkCatalogParams = new FlinkCatalogParams();
+ BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams,
"configuration");
+ try {
+ switch (flinkCatalog.getCatalogType()) {
+ case JDBC:
+ flinkCatalogParams.setFlinkJDBCCatalog(
+ JacksonUtils.read(flinkCatalog.getConfiguration(),
FlinkJDBCCatalog.class));
+ break;
+ case HIVE:
+ flinkCatalogParams.setFlinkHiveCatalog(
+ JacksonUtils.read(flinkCatalog.getConfiguration(),
FlinkHiveCatalog.class));
+ break;
+ case PAIMON:
+ flinkCatalogParams.setFlinkPaimonCatalog(
+ JacksonUtils.read(flinkCatalog.getConfiguration(),
FlinkPaimonCatalog.class));
+ break;
+ case CUSTOM:
+
flinkCatalogParams.setCustomCatalogConfig(flinkCatalog.getConfiguration());
+ break;
+ }
+ } catch (JsonProcessingException e) {
+ log.error("Flink catalog params json read failed", e);
+ throw new RuntimeException("Flink catalog params json read
failed");
+ }
+
+ return flinkCatalogParams;
+ }
+
+ @Data
+ public static class FlinkJDBCCatalog implements Serializable {
+
+ @NotBlank
+ private String type;
+
+ @NotBlank
+ @JsonProperty("default-database")
+ private String defaultDatabase;
+
+ @NotBlank
+ private String username;
+ @NotBlank
+ private String password;
+
+ @NotBlank
+ @JsonProperty("base-url")
+ private String baseUrl;
+ }
+
+ @Data
+ public static class FlinkHiveCatalog implements Serializable {
+
+ @NotBlank
+ private String type;
+ @NotBlank
+ private String name;
+
+ @JsonProperty("hive-conf-dir")
+ private String hiveConfDir;
+
+ @JsonProperty("default-database")
+ private String defaultDatabase;
+
+ @JsonProperty("hive-version")
+ private String hiveVersion;
+
+ @JsonProperty("hadoop-conf-dir")
+ private String hadoopConfDir;
+ }
+
+ @Data
+ public static class FlinkPaimonCatalog implements Serializable {
+
+ @NotBlank
+ private String type;
+ @NotBlank
+ private String warehouse;
+ @NotBlank
+ private String metastore; // hive filesystem
+ private String uri;
+
+ @JsonProperty("hive-conf-dir")
+ private String hiveConfDir;
+
+ @JsonProperty("hadoop-conf-dir")
+ private String hadoopConfDir;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
new file mode 100644
index 000000000..78847211f
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.annotation.Permission;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.util.ServiceHelper;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
+
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("flink/catalog")
+public class CatalogController {
+
+ @Autowired
+ CatalogService catalogService;
+
+ @Permission(team = "#catalog.teamId")
+ @PostMapping("create")
+ @RequiresPermissions("catalog:create")
+ public RestResponse create(FlinkCatalogParams catalog) throws IOException {
+ Long userId = ServiceHelper.getUserId();
+ boolean saved = catalogService.create(catalog, userId);
+ return RestResponse.success(saved);
+ }
+
+ @PostMapping("list")
+ @Permission(team = "#app.teamId")
+ @RequiresPermissions("catalog:view")
+ public RestResponse list(FlinkCatalogParams catalog, RestRequest request) {
+ IPage<FlinkCatalogParams> catalogList = catalogService.page(catalog,
request);
+ return RestResponse.success(catalogList);
+ }
+
+ @PostMapping("delete")
+ @Permission(team = "#app.teamId")
+ @RequiresPermissions("catalog:delete")
+ public RestResponse remove(FlinkCatalogParams catalog, RestRequest
request) {
+ boolean deleted = catalogService.remove(catalog.getId());
+ return RestResponse.success(deleted);
+ }
+
+ @PostMapping("update")
+ @Permission(team = "#app.teamId")
+ @RequiresPermissions("catalog:update")
+ public RestResponse remove(FlinkCatalogParams catalog) {
+ Long userId = ServiceHelper.getUserId();
+ boolean updated = catalogService.update(catalog, userId);
+ return RestResponse.success(updated);
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
new file mode 100644
index 000000000..b8de5a2e5
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+
+import com.baomidou.mybatisplus.annotation.FieldStrategy;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/** catalog store */
+@Data
+@TableName("t_flink_catalog")
+@Slf4j
+public class FlinkCatalog implements Serializable {
+
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
+ private Long teamId;
+
+ private String catalogName;
+
+ private CatalogType catalogType;
+
+ private String configuration;
+
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
+ private Date createTime;
+
+ private Date updateTime;
+
+ /** creator */
+ private Long userId;
+
+ public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) {
+ if (flinkCatalogParams == null)
+ return null;
+ FlinkCatalog flinkCatalog = new FlinkCatalog();
+
+ BeanUtils.copyProperties(
+ flinkCatalogParams,
+ flinkCatalog,
+ "flinkJDBCCatalog",
+ "flinkHiveCatalog",
+ "flinkPaimonCatalog",
+ "customCatalogConfig");
+
+ try {
+ switch (flinkCatalogParams.getCatalogType()) {
+ case JDBC:
+ flinkCatalog.setConfiguration(
+
JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog()));
+ break;
+ case HIVE:
+ flinkCatalog.setConfiguration(
+
JacksonUtils.write(flinkCatalogParams.getFlinkHiveCatalog()));
+ break;
+ case PAIMON:
+ flinkCatalog.setConfiguration(
+
JacksonUtils.write(flinkCatalogParams.getFlinkPaimonCatalog()));
+ break;
+ case CUSTOM:
+
flinkCatalog.setConfiguration(flinkCatalogParams.getCustomCatalogConfig());
+ break;
+ }
+ } catch (JsonProcessingException e) {
+ log.error("Flink catalog json read failed", e);
+ throw new RuntimeException("Flink catalog json read failed");
+ }
+ return flinkCatalog;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
new file mode 100644
index 000000000..7053bee5b
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import org.apache.ibatis.annotations.Param;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+/** catalog mapper */
+public interface CatalogMapper extends BaseMapper<FlinkCatalog> {
+
+ boolean existsByCatalogName(@Param("catalogName") String catalogName);
+
+ IPage<FlinkCatalog> selectPage(Page<FlinkCatalog> page, @Param("catalog")
FlinkCatalog catalog);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
new file mode 100644
index 000000000..c19d467b4
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/** This interface is use to managed catalog */
+public interface CatalogService extends IService<FlinkCatalog> {
+
+ /**
+ * Create Catalog
+ *
+ * @param catalog The {@link FlinkCatalogParams} object containing the
search criteria.
+ */
+ boolean create(FlinkCatalogParams catalog, Long userId);
+
+ /**
+ * Remove Catalog
+ *
+ * @param id The {@link FlinkCatalogParams} object containing the search
criteria.
+ */
+ boolean remove(Long id);
+
+ /**
+ * Retrieves a page of applications based on the provided parameters.
Params: catalog – The
+ * Catalog object to be used for filtering the results. request – The REST
request object
+ * containing additional parameters or headers. Returns: A page of Catalog
objects based on the
+ * provided parameters.
+ */
+ IPage<FlinkCatalogParams> page(FlinkCatalogParams catalog, RestRequest
request);
+
+ /**
+ * update Catalog
+ *
+ * @param catalog The {@link FlinkCatalogParams} object containing the
search criteria.
+ */
+ boolean update(FlinkCatalogParams catalog, long userId);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
new file mode 100644
index 000000000..199220fb0
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.exception.AlertException;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+import org.apache.streampark.console.core.mapper.CatalogMapper;
+import org.apache.streampark.console.core.service.CatalogService;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/** catalog manage */
+@Service
+@Slf4j
+@Transactional(propagation = Propagation.SUPPORTS, rollbackFor =
Exception.class)
+public class CatalogServiceImpl extends ServiceImpl<CatalogMapper,
FlinkCatalog>
+ implements
+ CatalogService {
+
+ private static final String CATALOG_REGEX =
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$";
+
+ @Override
+ public boolean create(FlinkCatalogParams catalog, Long userId) {
+ AlertException.throwIfNull(
+ catalog.getTeamId(), "The teamId can't be null. Create catalog
failed.");
+ AlertException.throwIfFalse(
+ validateCatalogName(catalog.getCatalogName()),
+ "Catalog Name only lowercase letters, numbers, and -,.. Symbol
composition, cannot end with a symbol.");
+ AlertException.throwIfTrue(
+ existsByCatalogName(catalog.getCatalogName()), "Catalog name
already exists.");
+ FlinkCatalog flinkCatalog = FlinkCatalog.of(catalog);
+ Date date = new Date();
+ flinkCatalog.setCreateTime(date);
+ flinkCatalog.setUpdateTime(date);
+ return this.save(flinkCatalog);
+ }
+
+ @Override
+ public boolean remove(Long id) {
+ FlinkCatalog catalog = getById(id);
+ ApiAlertException.throwIfNull(catalog, "Catalog not exist, please
check.");
+ return this.removeById(id);
+ }
+
+ @Override
+ public IPage<FlinkCatalogParams> page(FlinkCatalogParams catalog,
RestRequest request) {
+ AlertException.throwIfNull(
+ catalog.getTeamId(), "The teamId can't be null. List catalog
failed.");
+
+ Page<FlinkCatalog> page = MybatisPager.getPage(request);
+ this.baseMapper.selectPage(page, FlinkCatalog.of(catalog));
+ Page<FlinkCatalogParams> paramsPage = new Page<>();
+ BeanUtils.copyProperties(page, paramsPage, "records");
+ List<FlinkCatalogParams> paramList = new ArrayList<>();
+ page.getRecords()
+ .forEach(
+ record -> {
+ paramList.add(FlinkCatalogParams.of(record));
+ });
+ paramsPage.setRecords(paramList);
+ return paramsPage;
+ }
+
+ @Override
+ public boolean update(FlinkCatalogParams catalogParam, long userId) {
+ AlertException.throwIfNull(
+ catalogParam.getTeamId(), "The teamId can't be null. List catalog
failed.");
+ FlinkCatalog catalog = getById(catalogParam.getId());
+ FlinkCatalog flinkCatalog = FlinkCatalog.of(catalogParam);
+ AlertException.throwIfFalse(
+
catalogParam.getCatalogName().equalsIgnoreCase(catalog.getCatalogName()),
+ "The catalog name cannot be modified.");
+ log.debug(
+ "Catalog {} has modify from {} to {}",
+ catalog.getCatalogName(),
+ catalog.getConfiguration(),
+ flinkCatalog.getConfiguration());
+ catalog.setConfiguration(flinkCatalog.getConfiguration());
+ catalog.setUpdateTime(new Date());
+ catalog.setUserId(userId);
+ return this.updateById(catalog);
+ }
+
+ public Boolean existsByCatalogName(String catalogName) {
+ return this.baseMapper.existsByCatalogName(catalogName);
+ }
+
+ /** validate catalog name */
+ private boolean validateCatalogName(String catalogName) {
+ return Pattern.compile(CATALOG_REGEX).matcher(catalogName).matches();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index e3fca631f..037a4ab0e 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -575,3 +575,18 @@ create table if not exists `t_spark_app` (
`hadoop_user` varchar(500) default null,
primary key(`id`)
);
+
+ -- ----------------------------
+ -- Table structure for t_flink_app
+ -- ----------------------------
+ create table if not exists t_flink_catalog (
+ `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
+ `team_id` bigint not null,
+ `user_id` bigint default null,
+ `catalog_type` varchar(255) not NULL,
+ `catalog_name` VARCHAR(255) NOT NULL,
+ `configuration` text,
+ `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+ `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+ CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
+ );
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
new file mode 100644
index 000000000..9d1ffde45
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.streampark.console.core.mapper.CatalogMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.FlinkCatalog">
+ <id column="id" jdbcType="BIGINT" property="id"/>
+ <result column="team_id" jdbcType="BIGINT" property="teamId"/>
+ <result column="catalog_type" jdbcType="VARCHAR"
property="catalogType"/>
+ <result column="catalog_name" jdbcType="VARCHAR"
property="catalogName"/>
+ <result column="configuration" jdbcType="LONGVARCHAR"
property="configuration"/>
+ <result column="createTime" jdbcType="DATE" property="createTime"/>
+ <result column="updateTime" jdbcType="DATE" property="updateTime"/>
+ <result column="user_Id" jdbcType="LONGVARCHAR" property="userId"/>
+ </resultMap>
+
+ <select id="existsByCatalogName" resultType="java.lang.Boolean"
parameterType="java.lang.String">
+ select
+ CASE
+ WHEN count(1) > 0 THEN true ELSE false
+ END
+ from t_flink_catalog
+ where catalog_name = #{catalogName}
+ limit 1
+ </select>
+
+ <select id="selectPage"
resultType="org.apache.streampark.console.core.entity.FlinkCatalog"
parameterType="org.apache.streampark.console.core.entity.FlinkCatalog">
+ select * from t_flink_catalog
+ <where>
+ team_id = #{catalog.teamId}
+ <if test="catalog.catalogName != null and catalog.catalogName !=
''">
+ and catalog_name = #{catalog.catalogName}
+ </if>
+ <if test="catalog.userId != null and catalog.userId != ''">
+ and user_id = #{catalog.userId}
+ </if>
+ </where>
+ </select>
+</mapper>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
new file mode 100644
index 000000000..23decd3bd
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.SpringUnitTestBase;
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** CatalogService Tests */
+public class CatalogServiceTest extends SpringUnitTestBase {
+
+ @Autowired
+ private CatalogService catalogService;
+
+ @AfterEach
+ void cleanTestRecordsInDatabase() {
+ catalogService.remove(new QueryWrapper<>());
+ }
+
+ @Test
+ @Order(1)
+ public void create() {
+ FlinkCatalogParams catalog = new FlinkCatalogParams();
+ catalog.setTeamId(1L);
+ catalog.setCatalogType(CatalogType.JDBC);
+ catalog.setCatalogName("catalog-name");
+ FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+ new FlinkCatalogParams.FlinkJDBCCatalog();
+ flinkJDBCCatalog.setType("jdbc");
+ flinkJDBCCatalog.setDefaultDatabase("aa");
+ flinkJDBCCatalog.setPassword("11");
+ flinkJDBCCatalog.setUsername("user");
+ flinkJDBCCatalog.setBaseUrl("url");
+ catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+
+ boolean create = catalogService.create(catalog, 1L);
+ assertThat(create).isTrue();
+ }
+
+ @Test
+ @Order(2)
+ public void update() {
+ FlinkCatalogParams catalog = new FlinkCatalogParams();
+ catalog.setTeamId(1L);
+ catalog.setCatalogType(CatalogType.JDBC);
+ catalog.setCatalogName("catalog-name");
+ FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+ new FlinkCatalogParams.FlinkJDBCCatalog();
+ flinkJDBCCatalog.setType("jdbc");
+ flinkJDBCCatalog.setDefaultDatabase("aa");
+ flinkJDBCCatalog.setPassword("11");
+ flinkJDBCCatalog.setUsername("user");
+ flinkJDBCCatalog.setBaseUrl("url1");
+ catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+ RestRequest request = new RestRequest();
+ catalogService.create(catalog, 1L);
+
+ IPage<FlinkCatalogParams> catalogIPage = catalogService.page(catalog,
request);
+ FlinkCatalogParams catalogs = catalogIPage.getRecords().get(0);
+ catalogs.getFlinkJDBCCatalog().setBaseUrl("url2");
+ catalogService.update(catalogs, 2L);
+
+ IPage<FlinkCatalogParams> catalogResult = catalogService.page(catalog,
request);
+
+ assertThat(
+
catalogResult.getRecords().get(0).getFlinkJDBCCatalog().getBaseUrl().contains("url2"))
+ .isTrue();
+
assertThat(catalogResult.getRecords().get(0).getUserId().equals(2L)).isTrue();
+
assertThat(catalogResult.getRecords().get(0).getCatalogType().equals(CatalogType.JDBC))
+ .isTrue();
+ }
+
+ @Test
+ @Order(3)
+ public void remove() {
+ FlinkCatalogParams catalog = new FlinkCatalogParams();
+ catalog.setTeamId(1L);
+ catalog.setCatalogType(CatalogType.JDBC);
+ catalog.setCatalogName("catalog-name");
+ FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+ new FlinkCatalogParams.FlinkJDBCCatalog();
+ flinkJDBCCatalog.setType("jdbc");
+ flinkJDBCCatalog.setDefaultDatabase("aa");
+ flinkJDBCCatalog.setPassword("11");
+ flinkJDBCCatalog.setUsername("user");
+ flinkJDBCCatalog.setBaseUrl("url");
+ catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+ catalogService.create(catalog, 1L);
+ RestRequest request = new RestRequest();
+ IPage<FlinkCatalogParams> catalogIPage = catalogService.page(catalog,
request);
+ boolean deleted =
catalogService.remove(catalogIPage.getRecords().get(0).getId());
+ assertThat(deleted).isTrue();
+ }
+}