This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch schedule
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/schedule by this push:
     new 43bf1f3d5 Service
43bf1f3d5 is described below

commit 43bf1f3d54d4bca9885e4d020c98aa19f58ebd78
Author: chenqqq11 <[email protected]>
AuthorDate: Thu Jun 29 15:32:02 2023 +0800

    Service
---
 pom.xml                                            |   1 +
 .../apache/streampark/common/util/DateUtils.scala  |   7 +
 .../base/config/QuartzExecutorConfiguration.java   |  15 ++
 .../ScheduleParam.java}                            |  20 +-
 .../streampark/console/core/entity/Schedule.java   |   6 +-
 .../console/core/mapper/SchedulerMapper.java       |   7 +-
 .../console/core/quartz/JobScheduleTask.java       |   1 -
 .../console/core/quartz/QuartzExecutors.java       |   7 +-
 .../console/core/service/SchedulerService.java     |  10 +-
 .../core/service/impl/SchedulerServiceImpl.java    | 143 ++++++++++-
 .../main/resources/mapper/core/SchedulerMapper.xml |  32 +++
 streampark-flink/streampark-flink-catalog/pom.xml  |  60 +++++
 .../src/main/java/catalog/StreamParkCatalog.java   | 263 +++++++++++++++++++++
 .../main/java/catalog/dao/CatalogResourceDao.java  |   9 +
 14 files changed, 550 insertions(+), 31 deletions(-)

diff --git a/pom.xml b/pom.xml
index abd9e91cf..91df7093c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
         <module>streampark-spark</module>
         <module>streampark-storage</module>
         <module>streampark-console</module>
+        <module>streampark-flink/streampark-flink-catalog</module>
     </modules>
 
     <properties>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index fe91ee2b4..4c123d294 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -252,4 +252,11 @@ object DateUtils {
       .map(TimeZone.getTimeZone)
   }
 
+  def differSec(d1: Date, d2: Date): Long = {
+    if (d1 == null || d2 == null) return 0
+    Math.ceil(differMs(d1, d2) / 1000.0).toLong
+  }
+
+  def differMs(d1: Date, d2: Date): Long = Math.abs(d1.getTime - d2.getTime)
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
new file mode 100644
index 000000000..63582e1be
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
@@ -0,0 +1,15 @@
+package org.apache.streampark.console.base.config;
+
+import org.apache.streampark.console.core.quartz.QuartzExecutors;
+import org.apache.streampark.console.core.quartz.SchedulerApi;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class QuartzExecutorConfiguration {
+
+    @Bean
+    public SchedulerApi schedulerApi() {
+        return new QuartzExecutors();
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
similarity index 68%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
index ac672e345..55f8ac70f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
@@ -15,10 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.mapper;
+package org.apache.streampark.console.core.bean;
 
-import org.apache.streampark.console.core.entity.Schedule;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import java.util.Date;
 
-public interface SchedulerMapper extends BaseMapper<Schedule> {}
+/** schedule parameters */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ScheduleParam {
+
+  private Date startTime;
+  private Date endTime;
+  private String crontab;
+  private String timezoneId;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
index 903088f5b..074cb6e6e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.console.core.enums.ScheduleState;
 
 import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonFormat;
@@ -35,10 +34,7 @@ public class Schedule {
   @TableId(value = "id", type = IdType.AUTO)
   private int id;
 
-  private Long appId;
-
-  @TableField(exist = false)
-  private String description;
+  private long appId;
 
   /** schedule start time */
   @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
index ac672e345..a84fbda61 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
@@ -19,6 +19,11 @@ package org.apache.streampark.console.core.mapper;
 
 import org.apache.streampark.console.core.entity.Schedule;
 
+import org.apache.ibatis.annotations.Param;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
-public interface SchedulerMapper extends BaseMapper<Schedule> {}
+public interface SchedulerMapper extends BaseMapper<Schedule> {
+
+  Schedule queryByAppId(@Param("appId") long appId);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
index 928b058ae..4c46993ec 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
@@ -62,7 +62,6 @@ public class JobScheduleTask extends QuartzJobBean {
           "Job schedule does not exist in db or process schedule 
offline,delete schedule job in quartz, scheduleId:{}.",
           scheduleId);
       deleteJob(context, scheduleId);
-      return;
     }
     // start flink job
     // .......
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
index 2ba57af09..dd1486ffd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
@@ -29,6 +29,7 @@ import org.quartz.JobKey;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.TriggerKey;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Date;
 import java.util.Map;
@@ -42,13 +43,9 @@ import static org.quartz.TriggerBuilder.newTrigger;
 @Slf4j
 public class QuartzExecutors implements SchedulerApi {
 
-  private final Scheduler scheduler;
+  @Autowired Scheduler scheduler;
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  public QuartzExecutors(Scheduler scheduler) {
-    this.scheduler = scheduler;
-  }
-
   @SneakyThrows
   @Override
   public void start() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
index 4e5f6583b..da8182400 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.console.core.service;
 
+import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.core.entity.Schedule;
-import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.enums.ScheduleState;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -32,13 +32,13 @@ public interface SchedulerService extends 
IService<Schedule> {
 
   boolean updateSchedule(Long appId, String scheduleExpression);
 
-  Map<String, Object> setScheduleState(Long appId, ReleaseState 
scheduleStatus);
+  void setScheduleState(Long appId, ScheduleState scheduleStatus);
 
-  boolean deleteSchedule(int scheduleId);
+  void deleteSchedule(int scheduleId);
 
   Map<String, Object> previewSchedule(String schedule);
 
-  IPage<Schedule> page(Schedule savePoint, ScheduleState request);
+  IPage<Schedule> page(Schedule schedule, RestRequest request);
 
-  Schedule querySchedule(int id);
+  Schedule querySchedule(int appId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
index 2714eb1eb..f9b38b518 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
@@ -17,41 +17,160 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.console.base.domain.Constant;
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.bean.ScheduleParam;
 import org.apache.streampark.console.core.entity.Schedule;
-import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.enums.ScheduleState;
 import org.apache.streampark.console.core.mapper.SchedulerMapper;
+import org.apache.streampark.console.core.quartz.SchedulerApi;
 import org.apache.streampark.console.core.service.SchedulerService;
 
+import org.apache.commons.lang3.StringUtils;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
+import java.util.Date;
 import java.util.Map;
+import java.util.Objects;
 
 /** scheduler service impl */
 @Service
+@Slf4j
 public class SchedulerServiceImpl extends ServiceImpl<SchedulerMapper, 
Schedule>
     implements SchedulerService {
 
+  @Autowired private SchedulerMapper schedulerMapper;
+
+  @Autowired private SchedulerApi schedulerApi;
+
   @Override
+  @Transactional
   public boolean insertSchedule(Long appId, String schedule) {
+    Schedule scheduleObj = new Schedule();
+    Date now = new Date();
+
+    scheduleObj.setAppId(appId);
+    ScheduleParam scheduleParam = JsonUtils.read(schedule, 
ScheduleParam.class);
+    if (DateUtils.differSec(scheduleParam.getStartTime(), 
scheduleParam.getEndTime()) == 0) {
+      log.warn("The start time must not be the same as the end");
+      return false;
+    }
+    if (scheduleParam.getStartTime().getTime() > 
scheduleParam.getEndTime().getTime()) {
+      log.warn("The start time must smaller than end time");
+      return false;
+    }
+    scheduleObj.setStartTime(scheduleParam.getStartTime());
+    scheduleObj.setEndTime(scheduleParam.getEndTime());
+    if 
(!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+      log.error("{} verify failure", scheduleParam.getCrontab());
+      return false;
+    }
+    scheduleObj.setCrontab(scheduleParam.getCrontab());
+    scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
+    scheduleObj.setCreateTime(now);
+    scheduleObj.setUpdateTime(now);
+    scheduleObj.setScheduleState(ScheduleState.OFFLINE);
+    schedulerMapper.insert(scheduleObj);
+
     return false;
   }
 
   @Override
+  @Transactional
   public boolean updateSchedule(Long appId, String scheduleExpression) {
-    return false;
+    // check schedule exists
+    Schedule schedule = schedulerMapper.selectById(appId);
+
+    if (schedule == null) {
+      return false;
+    }
+
+    Date now = new Date();
+
+    if (!StringUtils.isEmpty(scheduleExpression)) {
+      ScheduleParam scheduleParam = JsonUtils.read(scheduleExpression, 
ScheduleParam.class);
+      if (scheduleParam == null) {
+        return false;
+      }
+      if (DateUtils.differSec(scheduleParam.getStartTime(), 
scheduleParam.getEndTime()) == 0) {
+        log.warn("The start time must not be the same as the end");
+        return false;
+      }
+      if (scheduleParam.getStartTime().getTime() > 
scheduleParam.getEndTime().getTime()) {
+        log.warn("The start time must smaller than end time");
+        return false;
+      }
+
+      schedule.setStartTime(scheduleParam.getStartTime());
+      schedule.setEndTime(scheduleParam.getEndTime());
+      if 
(!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+        return false;
+      }
+      schedule.setCrontab(scheduleParam.getCrontab());
+      schedule.setTimezoneId(scheduleParam.getTimezoneId());
+    }
+
+    schedule.setUpdateTime(now);
+    schedulerMapper.updateById(schedule);
+
+    return true;
   }
 
   @Override
-  public Map<String, Object> setScheduleState(Long appId, ReleaseState 
scheduleStatus) {
-    return null;
+  @Transactional
+  public void setScheduleState(Long appId, ScheduleState scheduleStatus) {
+    // check schedule exists
+    Schedule scheduleObj = schedulerMapper.selectById(appId);
+
+    if (scheduleObj == null) {
+      log.error("Schedule does not exist, scheduleId:{}.", appId);
+    }
+    // check schedule release state
+    if (Objects.requireNonNull(scheduleObj).getScheduleState() == 
scheduleStatus) {
+      log.warn(
+          "Schedule state does not need to change due to schedule state is 
already {}, scheduleId:{}.",
+          scheduleObj.getScheduleState().getDescp(),
+          scheduleObj.getId());
+    }
+    scheduleObj.setScheduleState(scheduleStatus);
+
+    schedulerMapper.updateById(scheduleObj);
+    try {
+      switch (scheduleStatus) {
+        case ONLINE:
+          log.info("Call master client set schedule online");
+          setSchedule(scheduleObj);
+          break;
+        case OFFLINE:
+          log.info("Call master client set schedule offline");
+          deleteSchedule(scheduleObj.getId());
+          break;
+        default:
+      }
+    } catch (Exception e) {
+    }
+  }
+
+  public void setSchedule(Schedule schedule) {
+    log.info("set schedule, scheduleId: {}", schedule.getId());
+    schedulerApi.insertOrUpdateScheduleTask(schedule);
   }
 
   @Override
-  public boolean deleteSchedule(int scheduleId) {
-    return false;
+  public void deleteSchedule(int scheduleId) {
+    log.info("delete schedules of schedule id:{}", scheduleId);
+    schedulerApi.deleteScheduleTask(scheduleId);
   }
 
   @Override
@@ -60,12 +179,16 @@ public class SchedulerServiceImpl extends 
ServiceImpl<SchedulerMapper, Schedule>
   }
 
   @Override
-  public IPage<Schedule> page(Schedule savePoint, ScheduleState request) {
-    return null;
+  public IPage<Schedule> page(Schedule schedule, RestRequest request) {
+    Page<Schedule> page =
+        new MybatisPager<Schedule>().getPage(request, "option_time", 
Constant.ORDER_DESC);
+    LambdaQueryWrapper<Schedule> queryWrapper =
+        new LambdaQueryWrapper<Schedule>().eq(Schedule::getAppId, 
schedule.getAppId());
+    return this.page(page, queryWrapper);
   }
 
   @Override
-  public Schedule querySchedule(int id) {
-    return getById(id);
+  public Schedule querySchedule(int appId) {
+    return schedulerMapper.queryByAppId(appId);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
new file mode 100644
index 000000000..818a985b3
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
+<mapper namespace="org.apache.streampark.console.core.mapper.SchedulerMapper">
+    <sql id="baseSql">
+        id, appId, start_time, end_time, timezone_id, crontab, create_time, 
update_time,schedule_state
+    </sql>
+
+    <select id="queryByAppId"
+            resultType="org.apache.streampark.console.core.entity.Schedule">
+        select
+        <include refid="baseSql"/>
+        from t_flink_schedules
+        where appId = #{appId} and schedule_state = 1
+    </select>
+</mapper>
diff --git a/streampark-flink/streampark-flink-catalog/pom.xml 
b/streampark-flink/streampark-flink-catalog/pom.xml
new file mode 100644
index 000000000..1eb8f4d14
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>streampark</artifactId>
+        <groupId>org.apache.streampark</groupId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streampark-flink-catalog</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.table.uber.artifact.id}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
 
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
new file mode 100644
index 000000000..7ddc127b9
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
@@ -0,0 +1,263 @@
+package catalog;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class StreamParkCatalog implements Catalog {
+    private final String catalogName;
+    private final Map<String, String> options;
+    private final CatalogResourceDao dao;
+    private final TableEnvClassLoader tEnvClassLoader;
+
+    private final ConnectorClassLoaderProvider connectorClassLoaderProvider;
+
+    private final UdfClassLoaderProvider udfClassLoaderProvider;
+
+    public StreamParkCatalog(String catalogName, Map<String, String> options, 
CatalogResourceDao dao, TableEnvClassLoader tEnvClassLoader, @Nullable 
ConnectorClassLoaderProvider connectorClassLoaderProvider, @Nullable 
UdfClassLoaderProvider udfClassLoaderProvider) {
+        Objects.requireNonNull(catalogName);
+        Objects.requireNonNull(options);
+        Objects.requireNonNull(dao);
+        Objects.requireNonNull(tEnvClassLoader);
+        if (!options.containsKey("defaultDatabase")) {
+            throw new CatalogException(String.format("Catalog `%s` is missing 
required property '%s'.", catalogName.getResourceId(), "defaultDatabase"));
+        } else if (((String)options.get("defaultDatabase")).isBlank()) {
+            throw new CatalogException(String.format("'%s' property of catalog 
`%s` must not be empty or blank.", "defaultDatabase", 
catalogName.getResourceId()));
+        } else {
+            this.catalogName = catalogName;
+            this.options = options;
+            this.dao = dao;
+            this.tEnvClassLoader = tEnvClassLoader;
+            this.connectorClassLoaderProvider = connectorClassLoaderProvider;
+            this.udfClassLoaderProvider = udfClassLoaderProvider;
+            if (udfClassLoaderProvider != null) {
+                
this.tEnvClassLoader.addClassLoader(udfClassLoaderProvider.createUdfClassLoader(catalogName.getNamespace()));
+            }
+
+            this.createDefaultDatabase();
+        }
+    }
+
+
+    @Override
+    public void open() throws CatalogException {
+
+    }
+
+    @Override
+    public void close() throws CatalogException {
+
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return this.options.get("defaultDatabase");
+
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return Optional.of(new 
ClassloaderFunctionDefinitionFactory(this.tEnvClassLoader));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String s) throws 
DatabaseNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public boolean databaseExists(String s) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createDatabase(String s, CatalogDatabase catalogDatabase, 
boolean b) throws DatabaseAlreadyExistException, CatalogException {
+
+    }
+
+    @Override
+    public void dropDatabase(String s, boolean b, boolean b1) throws 
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+
+    }
+
+    @Override
+    public void alterDatabase(String s, CatalogDatabase catalogDatabase, 
boolean b) throws DatabaseNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public List<String> listTables(String s) throws DatabaseNotExistException, 
CatalogException {
+        return null;
+    }
+
+    @Override
+    public List<String> listViews(String s) throws DatabaseNotExistException, 
CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public void dropTable(ObjectPath objectPath, boolean b) throws 
TableNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public void renameTable(ObjectPath objectPath, String s, boolean b) throws 
TableNotExistException, TableAlreadyExistException, CatalogException {
+
+    }
+
+    @Override
+    public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b) throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b) throws TableNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) 
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath 
objectPath, List<Expression> list) throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, 
CatalogException {
+        return null;
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+    }
+
+    @Override
+    public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, boolean b) throws PartitionNotExistException, 
CatalogException {
+
+    }
+
+    @Override
+    public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec 
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws 
PartitionNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public List<String> listFunctions(String s) throws 
DatabaseNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath objectPath) throws 
FunctionNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath objectPath) throws 
CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(ObjectPath objectPath, CatalogFunction 
catalogFunction, boolean b) throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public void alterFunction(ObjectPath objectPath, CatalogFunction 
catalogFunction, boolean b) throws FunctionNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public void dropFunction(ObjectPath objectPath, boolean b) throws 
FunctionNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath objectPath) 
throws TableNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
objectPath) throws TableNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(ObjectPath 
objectPath, CatalogPartitionSpec catalogPartitionSpec) throws 
PartitionNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath 
objectPath, CatalogPartitionSpec catalogPartitionSpec) throws 
PartitionNotExistException, CatalogException {
+        return null;
+    }
+
+    @Override
+    public void alterTableStatistics(ObjectPath objectPath, 
CatalogTableStatistics catalogTableStatistics, boolean b) throws 
TableNotExistException, CatalogException {
+
+    }
+
+    @Override
+    public void alterTableColumnStatistics(ObjectPath objectPath, 
CatalogColumnStatistics catalogColumnStatistics, boolean b) throws 
TableNotExistException, CatalogException, TablePartitionedException {
+
+    }
+
+    @Override
+    public void alterPartitionStatistics(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics 
catalogTableStatistics, boolean b) throws PartitionNotExistException, 
CatalogException {
+
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(ObjectPath objectPath, 
CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics 
catalogColumnStatistics, boolean b) throws PartitionNotExistException, 
CatalogException {
+
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
 
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
new file mode 100644
index 000000000..5e675784a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
@@ -0,0 +1,9 @@
+package catalog.dao;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface CatalogResourceDao {
+    List<Database> listDatabases(String catalogName);
+
+}

Reply via email to