This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch schedule
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/schedule by this push:
new 43bf1f3d5 Service
43bf1f3d5 is described below
commit 43bf1f3d54d4bca9885e4d020c98aa19f58ebd78
Author: chenqqq11 <[email protected]>
AuthorDate: Thu Jun 29 15:32:02 2023 +0800
Service
---
pom.xml | 1 +
.../apache/streampark/common/util/DateUtils.scala | 7 +
.../base/config/QuartzExecutorConfiguration.java | 15 ++
.../ScheduleParam.java} | 20 +-
.../streampark/console/core/entity/Schedule.java | 6 +-
.../console/core/mapper/SchedulerMapper.java | 7 +-
.../console/core/quartz/JobScheduleTask.java | 1 -
.../console/core/quartz/QuartzExecutors.java | 7 +-
.../console/core/service/SchedulerService.java | 10 +-
.../core/service/impl/SchedulerServiceImpl.java | 143 ++++++++++-
.../main/resources/mapper/core/SchedulerMapper.xml | 32 +++
streampark-flink/streampark-flink-catalog/pom.xml | 60 +++++
.../src/main/java/catalog/StreamParkCatalog.java | 263 +++++++++++++++++++++
.../main/java/catalog/dao/CatalogResourceDao.java | 9 +
14 files changed, 550 insertions(+), 31 deletions(-)
diff --git a/pom.xml b/pom.xml
index abd9e91cf..91df7093c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
<module>streampark-spark</module>
<module>streampark-storage</module>
<module>streampark-console</module>
+ <module>streampark-flink/streampark-flink-catalog</module>
</modules>
<properties>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index fe91ee2b4..4c123d294 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -252,4 +252,11 @@ object DateUtils {
.map(TimeZone.getTimeZone)
}
+ def differSec(d1: Date, d2: Date): Long = {
+ if (d1 == null || d2 == null) return 0
+ Math.ceil(differMs(d1, d2) / 1000.0).toLong
+ }
+
+ def differMs(d1: Date, d2: Date): Long = Math.abs(d1.getTime - d2.getTime)
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
new file mode 100644
index 000000000..63582e1be
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/QuartzExecutorConfiguration.java
@@ -0,0 +1,15 @@
+package org.apache.streampark.console.base.config;
+
+import org.apache.streampark.console.core.quartz.QuartzExecutors;
+import org.apache.streampark.console.core.quartz.SchedulerApi;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class QuartzExecutorConfiguration {
+
+ @Bean
+ public SchedulerApi schedulerApi() {
+ return new QuartzExecutors();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
similarity index 68%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
index ac672e345..55f8ac70f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ScheduleParam.java
@@ -15,10 +15,22 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.mapper;
+package org.apache.streampark.console.core.bean;
-import org.apache.streampark.console.core.entity.Schedule;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import java.util.Date;
-public interface SchedulerMapper extends BaseMapper<Schedule> {}
+/** schedule parameters */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ScheduleParam {
+
+ private Date startTime;
+ private Date endTime;
+ private String crontab;
+ private String timezoneId;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
index 903088f5b..074cb6e6e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.console.core.enums.ScheduleState;
import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
@@ -35,10 +34,7 @@ public class Schedule {
@TableId(value = "id", type = IdType.AUTO)
private int id;
- private Long appId;
-
- @TableField(exist = false)
- private String description;
+ private long appId;
/** schedule start time */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
index ac672e345..a84fbda61 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
@@ -19,6 +19,11 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.ibatis.annotations.Param;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface SchedulerMapper extends BaseMapper<Schedule> {}
+public interface SchedulerMapper extends BaseMapper<Schedule> {
+
+ Schedule queryByAppId(@Param("appId") long appId);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
index 928b058ae..4c46993ec 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
@@ -62,7 +62,6 @@ public class JobScheduleTask extends QuartzJobBean {
"Job schedule does not exist in db or process schedule
offline,delete schedule job in quartz, scheduleId:{}.",
scheduleId);
deleteJob(context, scheduleId);
- return;
}
// start flink job
// .......
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
index 2ba57af09..dd1486ffd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
@@ -29,6 +29,7 @@ import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.Map;
@@ -42,13 +43,9 @@ import static org.quartz.TriggerBuilder.newTrigger;
@Slf4j
public class QuartzExecutors implements SchedulerApi {
- private final Scheduler scheduler;
+ @Autowired Scheduler scheduler;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- public QuartzExecutors(Scheduler scheduler) {
- this.scheduler = scheduler;
- }
-
@SneakyThrows
@Override
public void start() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
index 4e5f6583b..da8182400 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
@@ -17,8 +17,8 @@
package org.apache.streampark.console.core.service;
+import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.Schedule;
-import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.enums.ScheduleState;
import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -32,13 +32,13 @@ public interface SchedulerService extends
IService<Schedule> {
boolean updateSchedule(Long appId, String scheduleExpression);
- Map<String, Object> setScheduleState(Long appId, ReleaseState
scheduleStatus);
+ void setScheduleState(Long appId, ScheduleState scheduleStatus);
- boolean deleteSchedule(int scheduleId);
+ void deleteSchedule(int scheduleId);
Map<String, Object> previewSchedule(String schedule);
- IPage<Schedule> page(Schedule savePoint, ScheduleState request);
+ IPage<Schedule> page(Schedule schedule, RestRequest request);
- Schedule querySchedule(int id);
+ Schedule querySchedule(int appId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
index 2714eb1eb..f9b38b518 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
@@ -17,41 +17,160 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.console.base.domain.Constant;
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.core.bean.ScheduleParam;
import org.apache.streampark.console.core.entity.Schedule;
-import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.enums.ScheduleState;
import org.apache.streampark.console.core.mapper.SchedulerMapper;
+import org.apache.streampark.console.core.quartz.SchedulerApi;
import org.apache.streampark.console.core.service.SchedulerService;
+import org.apache.commons.lang3.StringUtils;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import java.util.Date;
import java.util.Map;
+import java.util.Objects;
/** scheduler service impl */
@Service
+@Slf4j
public class SchedulerServiceImpl extends ServiceImpl<SchedulerMapper,
Schedule>
implements SchedulerService {
+ @Autowired private SchedulerMapper schedulerMapper;
+
+ @Autowired private SchedulerApi schedulerApi;
+
@Override
+ @Transactional
public boolean insertSchedule(Long appId, String schedule) {
+ Schedule scheduleObj = new Schedule();
+ Date now = new Date();
+
+ scheduleObj.setAppId(appId);
+ ScheduleParam scheduleParam = JsonUtils.read(schedule,
ScheduleParam.class);
+ if (DateUtils.differSec(scheduleParam.getStartTime(),
scheduleParam.getEndTime()) == 0) {
+ log.warn("The start time must not be the same as the end");
+ return false;
+ }
+ if (scheduleParam.getStartTime().getTime() >
scheduleParam.getEndTime().getTime()) {
+ log.warn("The start time must smaller than end time");
+ return false;
+ }
+ scheduleObj.setStartTime(scheduleParam.getStartTime());
+ scheduleObj.setEndTime(scheduleParam.getEndTime());
+ if
(!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+ log.error("{} verify failure", scheduleParam.getCrontab());
+ return false;
+ }
+ scheduleObj.setCrontab(scheduleParam.getCrontab());
+ scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
+ scheduleObj.setCreateTime(now);
+ scheduleObj.setUpdateTime(now);
+ scheduleObj.setScheduleState(ScheduleState.OFFLINE);
+ schedulerMapper.insert(scheduleObj);
+
return false;
}
@Override
+ @Transactional
public boolean updateSchedule(Long appId, String scheduleExpression) {
- return false;
+ // check schedule exists
+ Schedule schedule = schedulerMapper.selectById(appId);
+
+ if (schedule == null) {
+ return false;
+ }
+
+ Date now = new Date();
+
+ if (!StringUtils.isEmpty(scheduleExpression)) {
+ ScheduleParam scheduleParam = JsonUtils.read(scheduleExpression,
ScheduleParam.class);
+ if (scheduleParam == null) {
+ return false;
+ }
+ if (DateUtils.differSec(scheduleParam.getStartTime(),
scheduleParam.getEndTime()) == 0) {
+ log.warn("The start time must not be the same as the end");
+ return false;
+ }
+ if (scheduleParam.getStartTime().getTime() >
scheduleParam.getEndTime().getTime()) {
+ log.warn("The start time must smaller than end time");
+ return false;
+ }
+
+ schedule.setStartTime(scheduleParam.getStartTime());
+ schedule.setEndTime(scheduleParam.getEndTime());
+ if
(!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+ return false;
+ }
+ schedule.setCrontab(scheduleParam.getCrontab());
+ schedule.setTimezoneId(scheduleParam.getTimezoneId());
+ }
+
+ schedule.setUpdateTime(now);
+ schedulerMapper.updateById(schedule);
+
+ return true;
}
@Override
- public Map<String, Object> setScheduleState(Long appId, ReleaseState
scheduleStatus) {
- return null;
+ @Transactional
+ public void setScheduleState(Long appId, ScheduleState scheduleStatus) {
+ // check schedule exists
+ Schedule scheduleObj = schedulerMapper.selectById(appId);
+
+ if (scheduleObj == null) {
+ log.error("Schedule does not exist, scheduleId:{}.", appId);
+ }
+ // check schedule release state
+ if (Objects.requireNonNull(scheduleObj).getScheduleState() ==
scheduleStatus) {
+ log.warn(
+ "Schedule state does not need to change due to schedule state is
already {}, scheduleId:{}.",
+ scheduleObj.getScheduleState().getDescp(),
+ scheduleObj.getId());
+ }
+ scheduleObj.setScheduleState(scheduleStatus);
+
+ schedulerMapper.updateById(scheduleObj);
+ try {
+ switch (scheduleStatus) {
+ case ONLINE:
+ log.info("Call master client set schedule online");
+ setSchedule(scheduleObj);
+ break;
+ case OFFLINE:
+ log.info("Call master client set schedule offline");
+ deleteSchedule(scheduleObj.getId());
+ break;
+ default:
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ public void setSchedule(Schedule schedule) {
+ log.info("set schedule, scheduleId: {}", schedule.getId());
+ schedulerApi.insertOrUpdateScheduleTask(schedule);
}
@Override
- public boolean deleteSchedule(int scheduleId) {
- return false;
+ public void deleteSchedule(int scheduleId) {
+ log.info("delete schedules of schedule id:{}", scheduleId);
+ schedulerApi.deleteScheduleTask(scheduleId);
}
@Override
@@ -60,12 +179,16 @@ public class SchedulerServiceImpl extends
ServiceImpl<SchedulerMapper, Schedule>
}
@Override
- public IPage<Schedule> page(Schedule savePoint, ScheduleState request) {
- return null;
+ public IPage<Schedule> page(Schedule schedule, RestRequest request) {
+ Page<Schedule> page =
+ new MybatisPager<Schedule>().getPage(request, "option_time",
Constant.ORDER_DESC);
+ LambdaQueryWrapper<Schedule> queryWrapper =
+ new LambdaQueryWrapper<Schedule>().eq(Schedule::getAppId,
schedule.getAppId());
+ return this.page(page, queryWrapper);
}
@Override
- public Schedule querySchedule(int id) {
- return getById(id);
+ public Schedule querySchedule(int appId) {
+ return schedulerMapper.queryByAppId(appId);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
new file mode 100644
index 000000000..818a985b3
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SchedulerMapper.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.streampark.console.core.mapper.SchedulerMapper">
+ <sql id="baseSql">
+ id, appId, start_time, end_time, timezone_id, crontab, create_time,
update_time,schedule_state
+ </sql>
+
+ <select id="queryByAppId"
+ resultType="org.apache.streampark.console.core.entity.Schedule">
+ select
+ <include refid="baseSql"/>
+ from t_flink_schedules
+ where appId = #{appId} and schedule_state = 1
+ </select>
+</mapper>
diff --git a/streampark-flink/streampark-flink-catalog/pom.xml
b/streampark-flink/streampark-flink-catalog/pom.xml
new file mode 100644
index 000000000..1eb8f4d14
--- /dev/null
+++ b/streampark-flink/streampark-flink-catalog/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streampark</artifactId>
+ <groupId>org.apache.streampark</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streampark-flink-catalog</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.table.uber.artifact.id}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git
a/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
new file mode 100644
index 000000000..7ddc127b9
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/StreamParkCatalog.java
@@ -0,0 +1,263 @@
+package catalog;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class StreamParkCatalog implements Catalog {
+ private final String catalogName;
+ private final Map<String, String> options;
+ private final CatalogResourceDao dao;
+ private final TableEnvClassLoader tEnvClassLoader;
+
+ private final ConnectorClassLoaderProvider connectorClassLoaderProvider;
+
+ private final UdfClassLoaderProvider udfClassLoaderProvider;
+
+ public StreamParkCatalog(String catalogName, Map<String, String> options,
CatalogResourceDao dao, TableEnvClassLoader tEnvClassLoader, @Nullable
ConnectorClassLoaderProvider connectorClassLoaderProvider, @Nullable
UdfClassLoaderProvider udfClassLoaderProvider) {
+ Objects.requireNonNull(catalogName);
+ Objects.requireNonNull(options);
+ Objects.requireNonNull(dao);
+ Objects.requireNonNull(tEnvClassLoader);
+ if (!options.containsKey("defaultDatabase")) {
+ throw new CatalogException(String.format("Catalog `%s` is missing
required property '%s'.", catalogName.getResourceId(), "defaultDatabase"));
+ } else if (((String)options.get("defaultDatabase")).isBlank()) {
+ throw new CatalogException(String.format("'%s' property of catalog
`%s` must not be empty or blank.", "defaultDatabase",
catalogName.getResourceId()));
+ } else {
+ this.catalogName = catalogName;
+ this.options = options;
+ this.dao = dao;
+ this.tEnvClassLoader = tEnvClassLoader;
+ this.connectorClassLoaderProvider = connectorClassLoaderProvider;
+ this.udfClassLoaderProvider = udfClassLoaderProvider;
+ if (udfClassLoaderProvider != null) {
+
this.tEnvClassLoader.addClassLoader(udfClassLoaderProvider.createUdfClassLoader(catalogName.getNamespace()));
+ }
+
+ this.createDefaultDatabase();
+ }
+ }
+
+
+ @Override
+ public void open() throws CatalogException {
+
+ }
+
+ @Override
+ public void close() throws CatalogException {
+
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return this.options.get("defaultDatabase");
+
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return Optional.of(new
ClassloaderFunctionDefinitionFactory(this.tEnvClassLoader));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String s) throws
DatabaseNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean databaseExists(String s) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createDatabase(String s, CatalogDatabase catalogDatabase,
boolean b) throws DatabaseAlreadyExistException, CatalogException {
+
+ }
+
+ @Override
+ public void dropDatabase(String s, boolean b, boolean b1) throws
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+
+ }
+
+ @Override
+ public void alterDatabase(String s, CatalogDatabase catalogDatabase,
boolean b) throws DatabaseNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public List<String> listTables(String s) throws DatabaseNotExistException,
CatalogException {
+ return null;
+ }
+
+ @Override
+ public List<String> listViews(String s) throws DatabaseNotExistException,
CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath objectPath) throws
TableNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath objectPath) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void dropTable(ObjectPath objectPath, boolean b) throws
TableNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void renameTable(ObjectPath objectPath, String s, boolean b) throws
TableNotExistException, TableAlreadyExistException, CatalogException {
+
+ }
+
+ @Override
+ public void createTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b) throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b) throws TableNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath
objectPath, List<Expression> list) throws TableNotExistException,
TableNotPartitionedException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException,
CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws
TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
+
+ }
+
+ @Override
+ public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, boolean b) throws PartitionNotExistException,
CatalogException {
+
+ }
+
+ @Override
+ public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec
catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) throws
PartitionNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public List<String> listFunctions(String s) throws
DatabaseNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath objectPath) throws
FunctionNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath objectPath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(ObjectPath objectPath, CatalogFunction
catalogFunction, boolean b) throws FunctionAlreadyExistException,
DatabaseNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterFunction(ObjectPath objectPath, CatalogFunction
catalogFunction, boolean b) throws FunctionNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void dropFunction(ObjectPath objectPath, boolean b) throws
FunctionNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
objectPath) throws TableNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath
objectPath, CatalogPartitionSpec catalogPartitionSpec) throws
PartitionNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath
objectPath, CatalogPartitionSpec catalogPartitionSpec) throws
PartitionNotExistException, CatalogException {
+ return null;
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath objectPath,
CatalogTableStatistics catalogTableStatistics, boolean b) throws
TableNotExistException, CatalogException {
+
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath objectPath,
CatalogColumnStatistics catalogColumnStatistics, boolean b) throws
TableNotExistException, CatalogException, TablePartitionedException {
+
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics
catalogTableStatistics, boolean b) throws PartitionNotExistException,
CatalogException {
+
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics
catalogColumnStatistics, boolean b) throws PartitionNotExistException,
CatalogException {
+
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
new file mode 100644
index 000000000..5e675784a
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog/src/main/java/catalog/dao/CatalogResourceDao.java
@@ -0,0 +1,9 @@
+package catalog.dao;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface CatalogResourceDao {
+ List<Database> listDatabases(String catalogName);
+
+}