This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 33c64e8f8 [catalog] Introduce catalog managed (#3936)
33c64e8f8 is described below

commit 33c64e8f85261a699bc8e5e73bdad42784345897
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Aug 7 15:35:42 2024 +0800

    [catalog] Introduce catalog managed (#3936)
    
    * [catalog] Introduce catalog managed
    
    * [feature] Add the catalog type parameter specific definition.
    
    * spotless
    
    * revert vcs.xml
    
    * fixed sonar cloud
    
    * [impove] fixed sonar cloud issue
    
    * [feature][catalog] add test from check catalogtype To db
---
 .../streampark/common/enums/CatalogType.java       |  26 ++++
 .../streampark/console/base/util/JacksonUtils.java |  11 ++
 .../console/core/bean/FlinkCatalogParams.java      | 152 +++++++++++++++++++++
 .../console/core/controller/CatalogController.java |  81 +++++++++++
 .../console/core/entity/FlinkCatalog.java          |  99 ++++++++++++++
 .../console/core/mapper/CatalogMapper.java         |  34 +++++
 .../console/core/service/CatalogService.java       |  58 ++++++++
 .../core/service/impl/CatalogServiceImpl.java      | 123 +++++++++++++++++
 .../src/main/resources/db/schema-h2.sql            |  15 ++
 .../main/resources/mapper/core/CatalogMapper.xml   |  53 +++++++
 .../console/core/service/CatalogServiceTest.java   | 119 ++++++++++++++++
 11 files changed, 771 insertions(+)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
new file mode 100644
index 000000000..fb2caab55
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+/** catalog type */
+public enum CatalogType {
+    JDBC,
+    HIVE,
+    PAIMON,
+    CUSTOM
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
index c712c6edd..ee136c1b1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
@@ -22,10 +22,12 @@ import org.apache.streampark.common.util.DateUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.module.scala.DefaultScalaModule;
 
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 
 /** Serialization utils */
@@ -56,4 +58,13 @@ public final class JacksonUtils {
     public static String write(Object object) throws JsonProcessingException {
         return MAPPER.writeValueAsString(object);
     }
+
+    public static boolean isValidJson(String jsonStr) {
+        try {
+            JsonNode jsonNode = MAPPER.readTree(jsonStr);
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
new file mode 100644
index 000000000..46a8210d8
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.springframework.beans.BeanUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+import java.util.Date;
+
+@Data
+@Slf4j
+public class FlinkCatalogParams implements Serializable {
+
+    private Long id;
+
+    private Long teamId;
+
+    private String catalogName;
+
+    private CatalogType catalogType;
+
+    /** creator */
+    private Long userId;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    private FlinkJDBCCatalog flinkJDBCCatalog;
+
+    private FlinkHiveCatalog flinkHiveCatalog;
+
+    private FlinkPaimonCatalog flinkPaimonCatalog;
+
+    private String customCatalogConfig;
+
+    public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) {
+        if (flinkCatalog == null) {
+            return null;
+        }
+        FlinkCatalogParams flinkCatalogParams = new FlinkCatalogParams();
+        BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams, 
"configuration");
+        try {
+            switch (flinkCatalog.getCatalogType()) {
+                case JDBC:
+                    flinkCatalogParams.setFlinkJDBCCatalog(
+                        JacksonUtils.read(flinkCatalog.getConfiguration(), 
FlinkJDBCCatalog.class));
+                    break;
+                case HIVE:
+                    flinkCatalogParams.setFlinkHiveCatalog(
+                        JacksonUtils.read(flinkCatalog.getConfiguration(), 
FlinkHiveCatalog.class));
+                    break;
+                case PAIMON:
+                    flinkCatalogParams.setFlinkPaimonCatalog(
+                        JacksonUtils.read(flinkCatalog.getConfiguration(), 
FlinkPaimonCatalog.class));
+                    break;
+                case CUSTOM:
+                    
flinkCatalogParams.setCustomCatalogConfig(flinkCatalog.getConfiguration());
+                    break;
+            }
+        } catch (JsonProcessingException e) {
+            log.error("Flink catalog params json read failed", e);
+            throw new RuntimeException("Flink catalog params json read 
failed");
+        }
+
+        return flinkCatalogParams;
+    }
+
+    @Data
+    public static class FlinkJDBCCatalog implements Serializable {
+
+        @NotBlank
+        private String type;
+
+        @NotBlank
+        @JsonProperty("default-database")
+        private String defaultDatabase;
+
+        @NotBlank
+        private String username;
+        @NotBlank
+        private String password;
+
+        @NotBlank
+        @JsonProperty("base-url")
+        private String baseUrl;
+    }
+
+    @Data
+    public static class FlinkHiveCatalog implements Serializable {
+
+        @NotBlank
+        private String type;
+        @NotBlank
+        private String name;
+
+        @JsonProperty("hive-conf-dir")
+        private String hiveConfDir;
+
+        @JsonProperty("default-database")
+        private String defaultDatabase;
+
+        @JsonProperty("hive-version")
+        private String hiveVersion;
+
+        @JsonProperty("hadoop-conf-dir")
+        private String hadoopConfDir;
+    }
+
+    @Data
+    public static class FlinkPaimonCatalog implements Serializable {
+
+        @NotBlank
+        private String type;
+        @NotBlank
+        private String warehouse;
+        @NotBlank
+        private String metastore; // hive filesystem
+        private String uri;
+
+        @JsonProperty("hive-conf-dir")
+        private String hiveConfDir;
+
+        @JsonProperty("hadoop-conf-dir")
+        private String hadoopConfDir;
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
new file mode 100644
index 000000000..78847211f
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.annotation.Permission;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.util.ServiceHelper;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
+
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("flink/catalog")
+public class CatalogController {
+
+    @Autowired
+    CatalogService catalogService;
+
+    @Permission(team = "#catalog.teamId")
+    @PostMapping("create")
+    @RequiresPermissions("catalog:create")
+    public RestResponse create(FlinkCatalogParams catalog) throws IOException {
+        Long userId = ServiceHelper.getUserId();
+        boolean saved = catalogService.create(catalog, userId);
+        return RestResponse.success(saved);
+    }
+
+    @PostMapping("list")
+    @Permission(team = "#app.teamId")
+    @RequiresPermissions("catalog:view")
+    public RestResponse list(FlinkCatalogParams catalog, RestRequest request) {
+        IPage<FlinkCatalogParams> catalogList = catalogService.page(catalog, 
request);
+        return RestResponse.success(catalogList);
+    }
+
+    @PostMapping("delete")
+    @Permission(team = "#app.teamId")
+    @RequiresPermissions("catalog:delete")
+    public RestResponse remove(FlinkCatalogParams catalog, RestRequest 
request) {
+        boolean deleted = catalogService.remove(catalog.getId());
+        return RestResponse.success(deleted);
+    }
+
+    @PostMapping("update")
+    @Permission(team = "#app.teamId")
+    @RequiresPermissions("catalog:update")
+    public RestResponse remove(FlinkCatalogParams catalog) {
+        Long userId = ServiceHelper.getUserId();
+        boolean updated = catalogService.update(catalog, userId);
+        return RestResponse.success(updated);
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
new file mode 100644
index 000000000..b8de5a2e5
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+
+import com.baomidou.mybatisplus.annotation.FieldStrategy;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/** catalog store */
+@Data
+@TableName("t_flink_catalog")
+@Slf4j
+public class FlinkCatalog implements Serializable {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    private Long teamId;
+
+    private String catalogName;
+
+    private CatalogType catalogType;
+
+    private String configuration;
+
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
+    private Date createTime;
+
+    private Date updateTime;
+
+    /** creator */
+    private Long userId;
+
+    public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) {
+        if (flinkCatalogParams == null)
+            return null;
+        FlinkCatalog flinkCatalog = new FlinkCatalog();
+
+        BeanUtils.copyProperties(
+            flinkCatalogParams,
+            flinkCatalog,
+            "flinkJDBCCatalog",
+            "flinkHiveCatalog",
+            "flinkPaimonCatalog",
+            "customCatalogConfig");
+
+        try {
+            switch (flinkCatalogParams.getCatalogType()) {
+                case JDBC:
+                    flinkCatalog.setConfiguration(
+                        
JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog()));
+                    break;
+                case HIVE:
+                    flinkCatalog.setConfiguration(
+                        
JacksonUtils.write(flinkCatalogParams.getFlinkHiveCatalog()));
+                    break;
+                case PAIMON:
+                    flinkCatalog.setConfiguration(
+                        
JacksonUtils.write(flinkCatalogParams.getFlinkPaimonCatalog()));
+                    break;
+                case CUSTOM:
+                    
flinkCatalog.setConfiguration(flinkCatalogParams.getCustomCatalogConfig());
+                    break;
+            }
+        } catch (JsonProcessingException e) {
+            log.error("Flink catalog json read failed", e);
+            throw new RuntimeException("Flink catalog json read failed");
+        }
+        return flinkCatalog;
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
new file mode 100644
index 000000000..7053bee5b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import org.apache.ibatis.annotations.Param;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
+/** catalog mapper */
+public interface CatalogMapper extends BaseMapper<FlinkCatalog> {
+
+    boolean existsByCatalogName(@Param("catalogName") String catalogName);
+
+    IPage<FlinkCatalog> selectPage(Page<FlinkCatalog> page, @Param("catalog") 
FlinkCatalog catalog);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
new file mode 100644
index 000000000..c19d467b4
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/** This interface is use to managed catalog */
+public interface CatalogService extends IService<FlinkCatalog> {
+
+    /**
+     * Create Catalog
+     *
+     * @param catalog The {@link FlinkCatalogParams} object containing the 
search criteria.
+     */
+    boolean create(FlinkCatalogParams catalog, Long userId);
+
+    /**
+     * Remove Catalog
+     *
+     * @param id The {@link FlinkCatalogParams} object containing the search 
criteria.
+     */
+    boolean remove(Long id);
+
+    /**
+     * Retrieves a page of applications based on the provided parameters. 
Params: catalog – The
+     * Catalog object to be used for filtering the results. request – The REST 
request object
+     * containing additional parameters or headers. Returns: A page of Catalog 
objects based on the
+     * provided parameters.
+     */
+    IPage<FlinkCatalogParams> page(FlinkCatalogParams catalog, RestRequest 
request);
+
+    /**
+     * update Catalog
+     *
+     * @param catalog The {@link FlinkCatalogParams} object containing the 
search criteria.
+     */
+    boolean update(FlinkCatalogParams catalog, long userId);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
new file mode 100644
index 000000000..199220fb0
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.exception.AlertException;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+import org.apache.streampark.console.core.mapper.CatalogMapper;
+import org.apache.streampark.console.core.service.CatalogService;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/** catalog manage */
+@Service
+@Slf4j
+@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = 
Exception.class)
+public class CatalogServiceImpl extends ServiceImpl<CatalogMapper, 
FlinkCatalog>
+    implements
+        CatalogService {
+
+    private static final String CATALOG_REGEX = 
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$";
+
+    @Override
+    public boolean create(FlinkCatalogParams catalog, Long userId) {
+        AlertException.throwIfNull(
+            catalog.getTeamId(), "The teamId can't be null. Create catalog 
failed.");
+        AlertException.throwIfFalse(
+            validateCatalogName(catalog.getCatalogName()),
+            "Catalog Name only lowercase letters, numbers, and -,.. Symbol 
composition, cannot end with a symbol.");
+        AlertException.throwIfTrue(
+            existsByCatalogName(catalog.getCatalogName()), "Catalog name  
already exists.");
+        FlinkCatalog flinkCatalog = FlinkCatalog.of(catalog);
+        Date date = new Date();
+        flinkCatalog.setCreateTime(date);
+        flinkCatalog.setUpdateTime(date);
+        return this.save(flinkCatalog);
+    }
+
+    @Override
+    public boolean remove(Long id) {
+        FlinkCatalog catalog = getById(id);
+        ApiAlertException.throwIfNull(catalog, "Catalog not exist, please 
check.");
+        return this.removeById(id);
+    }
+
+    @Override
+    public IPage<FlinkCatalogParams> page(FlinkCatalogParams catalog, 
RestRequest request) {
+        AlertException.throwIfNull(
+            catalog.getTeamId(), "The teamId can't be null. List catalog 
failed.");
+
+        Page<FlinkCatalog> page = MybatisPager.getPage(request);
+        this.baseMapper.selectPage(page, FlinkCatalog.of(catalog));
+        Page<FlinkCatalogParams> paramsPage = new Page<>();
+        BeanUtils.copyProperties(page, paramsPage, "records");
+        List<FlinkCatalogParams> paramList = new ArrayList<>();
+        page.getRecords()
+            .forEach(
+                record -> {
+                    paramList.add(FlinkCatalogParams.of(record));
+                });
+        paramsPage.setRecords(paramList);
+        return paramsPage;
+    }
+
+    @Override
+    public boolean update(FlinkCatalogParams catalogParam, long userId) {
+        AlertException.throwIfNull(
+            catalogParam.getTeamId(), "The teamId can't be null. List catalog 
failed.");
+        FlinkCatalog catalog = getById(catalogParam.getId());
+        FlinkCatalog flinkCatalog = FlinkCatalog.of(catalogParam);
+        AlertException.throwIfFalse(
+            
catalogParam.getCatalogName().equalsIgnoreCase(catalog.getCatalogName()),
+            "The catalog name cannot be modified.");
+        log.debug(
+            "Catalog {} has modify from {} to {}",
+            catalog.getCatalogName(),
+            catalog.getConfiguration(),
+            flinkCatalog.getConfiguration());
+        catalog.setConfiguration(flinkCatalog.getConfiguration());
+        catalog.setUpdateTime(new Date());
+        catalog.setUserId(userId);
+        return this.updateById(catalog);
+    }
+
+    public Boolean existsByCatalogName(String catalogName) {
+        return this.baseMapper.existsByCatalogName(catalogName);
+    }
+
+    /** validate catalog name */
+    private boolean validateCatalogName(String catalogName) {
+        return Pattern.compile(CATALOG_REGEX).matcher(catalogName).matches();
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index e3fca631f..037a4ab0e 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -575,3 +575,18 @@ create table if not exists `t_spark_app` (
     `hadoop_user` varchar(500) default null,
     primary key(`id`)
     );
+
+    -- ----------------------------
+    -- Table structure for t_flink_app
+    -- ----------------------------
+    create table if not exists t_flink_catalog (
+       `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
+       `team_id` bigint not null,
+       `user_id` bigint default null,
+       `catalog_type` varchar(255) not NULL,
+       `catalog_name` VARCHAR(255) NOT NULL,
+       `configuration` text,
+       `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+       `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
+       CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
+    );
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
new file mode 100644
index 000000000..9d1ffde45
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper namespace="org.apache.streampark.console.core.mapper.CatalogMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.streampark.console.core.entity.FlinkCatalog">
+        <id column="id" jdbcType="BIGINT" property="id"/>
+        <result column="team_id" jdbcType="BIGINT" property="teamId"/>
+        <result column="catalog_type" jdbcType="VARCHAR" 
property="catalogType"/>
+        <result column="catalog_name" jdbcType="VARCHAR" 
property="catalogName"/>
+        <result column="configuration" jdbcType="LONGVARCHAR" 
property="configuration"/>
+        <result column="createTime" jdbcType="DATE" property="createTime"/>
+        <result column="updateTime" jdbcType="DATE" property="updateTime"/>
+        <result column="user_Id" jdbcType="LONGVARCHAR" property="userId"/>
+    </resultMap>
+
+    <select id="existsByCatalogName" resultType="java.lang.Boolean" 
parameterType="java.lang.String">
+        select
+        CASE
+        WHEN  count(1) > 0 THEN true ELSE false
+        END
+        from t_flink_catalog
+        where catalog_name = #{catalogName}
+        limit 1
+    </select>
+
+    <select id="selectPage" 
resultType="org.apache.streampark.console.core.entity.FlinkCatalog" 
parameterType="org.apache.streampark.console.core.entity.FlinkCatalog">
+        select * from t_flink_catalog
+        <where>
+            team_id = #{catalog.teamId}
+            <if test="catalog.catalogName != null and catalog.catalogName != 
''">
+                and catalog_name = #{catalog.catalogName}
+            </if>
+            <if test="catalog.userId != null and catalog.userId != ''">
+                and user_id = #{catalog.userId}
+            </if>
+        </where>
+    </select>
+</mapper>
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
new file mode 100644
index 000000000..23decd3bd
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.CatalogType;
+import org.apache.streampark.console.SpringUnitTestBase;
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.core.bean.FlinkCatalogParams;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** CatalogService Tests */
+public class CatalogServiceTest extends SpringUnitTestBase {
+
+    @Autowired
+    private CatalogService catalogService;
+
+    @AfterEach
+    void cleanTestRecordsInDatabase() {
+        catalogService.remove(new QueryWrapper<>());
+    }
+
+    @Test
+    @Order(1)
+    public void create() {
+        FlinkCatalogParams catalog = new FlinkCatalogParams();
+        catalog.setTeamId(1L);
+        catalog.setCatalogType(CatalogType.JDBC);
+        catalog.setCatalogName("catalog-name");
+        FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+            new FlinkCatalogParams.FlinkJDBCCatalog();
+        flinkJDBCCatalog.setType("jdbc");
+        flinkJDBCCatalog.setDefaultDatabase("aa");
+        flinkJDBCCatalog.setPassword("11");
+        flinkJDBCCatalog.setUsername("user");
+        flinkJDBCCatalog.setBaseUrl("url");
+        catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+
+        boolean create = catalogService.create(catalog, 1L);
+        assertThat(create).isTrue();
+    }
+
+    @Test
+    @Order(2)
+    public void update() {
+        FlinkCatalogParams catalog = new FlinkCatalogParams();
+        catalog.setTeamId(1L);
+        catalog.setCatalogType(CatalogType.JDBC);
+        catalog.setCatalogName("catalog-name");
+        FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+            new FlinkCatalogParams.FlinkJDBCCatalog();
+        flinkJDBCCatalog.setType("jdbc");
+        flinkJDBCCatalog.setDefaultDatabase("aa");
+        flinkJDBCCatalog.setPassword("11");
+        flinkJDBCCatalog.setUsername("user");
+        flinkJDBCCatalog.setBaseUrl("url1");
+        catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+        RestRequest request = new RestRequest();
+        catalogService.create(catalog, 1L);
+
+        IPage<FlinkCatalogParams> catalogIPage = catalogService.page(catalog, 
request);
+        FlinkCatalogParams catalogs = catalogIPage.getRecords().get(0);
+        catalogs.getFlinkJDBCCatalog().setBaseUrl("url2");
+        catalogService.update(catalogs, 2L);
+
+        IPage<FlinkCatalogParams> catalogResult = catalogService.page(catalog, 
request);
+
+        assertThat(
+            
catalogResult.getRecords().get(0).getFlinkJDBCCatalog().getBaseUrl().contains("url2"))
+                .isTrue();
+        
assertThat(catalogResult.getRecords().get(0).getUserId().equals(2L)).isTrue();
+        
assertThat(catalogResult.getRecords().get(0).getCatalogType().equals(CatalogType.JDBC))
+            .isTrue();
+    }
+
+    @Test
+    @Order(3)
+    public void remove() {
+        FlinkCatalogParams catalog = new FlinkCatalogParams();
+        catalog.setTeamId(1L);
+        catalog.setCatalogType(CatalogType.JDBC);
+        catalog.setCatalogName("catalog-name");
+        FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
+            new FlinkCatalogParams.FlinkJDBCCatalog();
+        flinkJDBCCatalog.setType("jdbc");
+        flinkJDBCCatalog.setDefaultDatabase("aa");
+        flinkJDBCCatalog.setPassword("11");
+        flinkJDBCCatalog.setUsername("user");
+        flinkJDBCCatalog.setBaseUrl("url");
+        catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
+        catalogService.create(catalog, 1L);
+        RestRequest request = new RestRequest();
+        IPage<FlinkCatalogParams> catalogIPage = catalogService.page(catalog, 
request);
+        boolean deleted = 
catalogService.remove(catalogIPage.getRecords().get(0).getId());
+        assertThat(deleted).isTrue();
+    }
+}


Reply via email to