This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 62b6c05070b4aff32272f0f727746cd8c1f778e0 Author: wangkai <[email protected]> AuthorDate: Mon May 22 17:32:06 2023 +0800 1.add embedded database 2.add default validateFilter --- adapter/persistence/pom.xml | 5 + .../adapter/persistence/DatasourceConfig.java | 110 +++++++++++++++++++++ .../adapter/persistence/PersistenceConfig.java | 2 - .../MybatisEventTargetRunnerRepository.java | 5 + .../main/resources/db/migration/V1__baseline.sql | 76 +++++++------- .../main/resources/db/migration/V2__baseline.sql | 38 +++---- .../migration/V3__change_target_transform_type.sql | 30 +++--- .../db/migration/V4__register_source_acs_mns.sql | 30 +++--- .../migration/V5__register_target_acs_dingtalk.sql | 30 +++--- .../V6__register_target_acs_eventbridge.sql | 30 +++--- ...V7__update_event_connection_table_structure.sql | 35 +++---- .../db/migration/V8__update_unique_key.sql | 40 ++++---- .../rocketmq/impl/RocketMQEventDataRepository.java | 6 +- .../eventbridge/enums/props/Constants.java | 37 +++++++ .../ServiceProviderNotFoundException.java | 37 +++++++ infrastructure/pom.xml | 11 +++ .../infrastructure/validate/AuthValidation.java | 42 ++++++++ .../validate/DefaultAuthValidation.java | 47 +++++++++ .../validate/spi/ServiceLifecycle.java | 31 ++++++ .../validate/spi/ValidationServiceFactory.java | 47 +++++++++ .../validate/spi/ValidationServiceLoader.java | 91 +++++++++++++++++ .../validate/spi/annotation/SingletonSPI.java | 31 ++++++ .../validate/spi/typed/TypedSPI.java | 45 +++++++++ .../validate/spi/typed/TypedSPIRegistry.java | 79 +++++++++++++++ ...ntbridge.infrastructure.validate.AuthValidation | 6 +- .../eventbridge/config/H2ServerConfig.java | 41 ++++++++ .../rocketmq/eventbridge/filter/LoginFilter.java | 26 ++--- .../eventbridge/filter/ValidateFilter.java | 63 ++++++++++++ start/src/main/resources/application.properties | 10 +- 29 files changed, 903 insertions(+), 178 deletions(-) diff --git a/adapter/persistence/pom.xml b/adapter/persistence/pom.xml index dd46427..eac0c8e 100644 --- a/adapter/persistence/pom.xml +++ b/adapter/persistence/pom.xml @@ -53,6 +53,11 @@ <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector-java.version}</version> </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java new file mode 100644 index 0000000..6718b8d --- /dev/null +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.persistence; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.ibatis.session.SqlSessionFactory; +import org.mybatis.spring.SqlSessionFactoryBean; +import org.mybatis.spring.SqlSessionTemplate; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; + +import javax.sql.DataSource; + +/** + * DatasourceConfig + */ +@Configuration +@MapperScan(basePackages = "org.apache.rocketmq.eventbridge.adapter.persistence.*.mybatis.*") +public class DatasourceConfig { + private static final String MAPPER_LOCATION = "classpath:mybatis/*.xml"; + + @Value("${spring.datasource.hikari.jdbc-url:jdbc:h2:./rocketmq_eventbridge;MODE=MySQL}") + private String baseUrl; + @Value("${spring.datasource.hikari.driver-class-name:org.h2.Driver}") + private String baseDriverClassName; + @Value("${spring.datasource.hikari.username:sa}") + private String baseUserName; + @Value("${spring.datasource.hikari.password:sa}") + private String basePassword; + + @Value("${spring.datasource.hikari.minimum-idle:5}") + private Integer minIdle; + + @Value("${spring.datasource.hikari.idle-timeout:180000}") + private Long idleTimeoutMs; + + @Value("${spring.datasource.hikari.maximum-pool-size: 10}") + private Integer maxPoolSize; + + @Value("${spring.datasource.hikari.auto-commit: true}") + private Boolean autoCommit; + + @Value("${spring.datasource.hikari.pool-name: hikaricp}") + private String poolName; + + @Value("${spring.datasource.hikari.max-lifetime: 180000}") + private Long maxLifeTime; + + @Value("${spring.datasource.hikari.connection-timeout: 30000}") + private Long connectionTimeoutMs; + + @Value("${spring.datasource.hikari.connection-test-query: select 1}") + private String connectionTestQuery; + + @Value("${spring.datasource.hikari.validation-timeout: 500}") + private Long validationTimeoutMs; + + @Bean("dataSource") + public DataSource getMasterDataSource(){ + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(baseUrl); + hikariConfig.setDriverClassName(baseDriverClassName); + hikariConfig.setUsername(baseUserName); + hikariConfig.setPassword(basePassword); + hikariConfig.setMinimumIdle(minIdle); + hikariConfig.setIdleTimeout(idleTimeoutMs); + hikariConfig.setMaximumPoolSize(maxPoolSize); + hikariConfig.setAutoCommit(autoCommit); + hikariConfig.setPoolName(poolName); + hikariConfig.setMaxLifetime(maxLifeTime); + hikariConfig.setConnectionTimeout(connectionTimeoutMs); + hikariConfig.setConnectionTestQuery(connectionTestQuery); + hikariConfig.setValidationTimeout(validationTimeoutMs); + return new HikariDataSource(hikariConfig); + } + + @Bean("sqlSessionFactory") + public SqlSessionFactory masterSqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception { + SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); + bean.setDataSource(dataSource); + bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION)); + return bean.getObject(); + } + + @Bean("sqlSessionTemplate") + public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){ + return new SqlSessionTemplate(sqlSessionFactory); + } + +} + diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java index 8b70fb9..d115ad2 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java @@ -20,13 +20,11 @@ package org.apache.rocketmq.eventbridge.adapter.persistence; import lombok.SneakyThrows; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration -@MapperScan(basePackages = "org.apache.rocketmq.eventbridge.adapter.persistence.*.mybatis.*") public class PersistenceConfig { @SneakyThrows diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java index 4edeecd..0c92aa4 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java @@ -21,6 +21,8 @@ import com.google.gson.Gson; import java.util.List; import java.util.Map; import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO; import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper; @@ -72,6 +74,9 @@ public class MybatisEventTargetRunnerRepository implements EventTargetRunnerRepo @Override public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) { + if (StringUtils.isBlank(accountId) || StringUtils.isBlank(eventBusName) || StringUtils.isBlank(eventRuleName)) { + return Lists.newArrayListWithCapacity(0); + } List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName); if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) { return Lists.newArrayListWithCapacity(0); diff --git a/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql b/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql index 6681b7e..3909293 100644 --- a/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql +++ b/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql @@ -1,17 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ CREATE TABLE IF NOT EXISTS `event_bus` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, @@ -21,11 +23,10 @@ CREATE TABLE IF NOT EXISTS `event_bus` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event bus meta' -; + UNIQUE KEY `name_uniq_key_event_bus` (`account_id`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; -CREATE TABLE `event_topic` ( +CREATE TABLE IF NOT EXISTS `event_topic` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `account_id` varchar(255) DEFAULT 'SYSTEM' COMMENT 'source account id', `bus` varchar(255) NOT NULL COMMENT 'bus name', @@ -36,9 +37,8 @@ CREATE TABLE `event_topic` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 -; + UNIQUE KEY `name_uniq_key_event_topicdd` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS `event_source` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -53,8 +53,8 @@ CREATE TABLE IF NOT EXISTS `event_source` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source meta' + UNIQUE KEY `name_uniq_key_event_source` (`account_id`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; CREATE TABLE IF NOT EXISTS `event_type` ( @@ -67,12 +67,12 @@ CREATE TABLE IF NOT EXISTS `event_type` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`source`,`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event type meta' + UNIQUE KEY `name_uniq_key_event_type` (`source`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; CREATE TABLE IF NOT EXISTS `event_rule` ( - `id` bigint(20) NOT NULL AUTO_INCREMENT, + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `account_id` varchar(255) NOT NULL COMMENT 'bus account id', `bus` varchar(255) NOT NULL COMMENT 'bus name', `name` varchar(255) NOT NULL COMMENT 'rule name', @@ -82,8 +82,8 @@ CREATE TABLE IF NOT EXISTS `event_rule` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', UNIQUE KEY `id` (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`name`,`bus`) -) ENGINE=InnoDB AUTO_INCREMENT=51815 DEFAULT CHARSET=utf8 COMMENT='event rule meta' + UNIQUE KEY `name_uniq_key_event_rule` (`account_id`,`name`,`bus`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; CREATE TABLE IF NOT EXISTS `event_source_runner` ( @@ -96,8 +96,8 @@ CREATE TABLE IF NOT EXISTS `event_source_runner` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`source`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source runner meta' + UNIQUE KEY `name_uniq_key_event_source_runner` (`account_id`,`bus`,`source`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; CREATE TABLE IF NOT EXISTS `event_target` ( @@ -112,8 +112,8 @@ CREATE TABLE IF NOT EXISTS `event_target` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`rule`,`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target meta' + UNIQUE KEY `name_uniq_key_event_target` (`account_id`,`bus`,`rule`,`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; @@ -128,8 +128,8 @@ CREATE TABLE IF NOT EXISTS `event_target_runner` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`rule`,`target`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target runner meta' + UNIQUE KEY `name_uniq_key_event_target_runner` (`account_id`,`bus`,`rule`,`target`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; @@ -144,8 +144,8 @@ CREATE TABLE IF NOT EXISTS `event_source_class` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source class meta' + UNIQUE KEY `name_uniq_key_event_source_class` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; @@ -160,6 +160,6 @@ CREATE TABLE IF NOT EXISTS `event_target_class` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target class meta' + UNIQUE KEY `name_uniq_key_event_target_class` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; diff --git a/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql b/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql index 1d04ec5..c70912c 100644 --- a/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql +++ b/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql @@ -1,17 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ CREATE TABLE IF NOT EXISTS `event_connection` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -25,8 +27,8 @@ CREATE TABLE IF NOT EXISTS `event_connection` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`name`) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event connection meta' + UNIQUE KEY `name_uniq_key_event_connection` (`name`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; @@ -42,6 +44,6 @@ CREATE TABLE IF NOT EXISTS `event_api_destination` ( `gmt_create` datetime DEFAULT NULL COMMENT 'create time', `gmt_modify` datetime DEFAULT NULL COMMENT 'modify time', PRIMARY KEY (`id`), - UNIQUE KEY `name_uniq_key` (`name`) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event api destination meta' + UNIQUE KEY `name_uniq_key_event_api_destination` (`name`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ; diff --git a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql b/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql index edaf379..b2a18c2 100644 --- a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql +++ b/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql @@ -1,17 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ ALTER TABLE `event_target_class` CHANGE COLUMN `target_transform` `target_transform` TEXT NULL DEFAULT NULL ; diff --git a/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql index 7e826f0..1ad499a 100644 --- a/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql +++ b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql @@ -1,16 +1,18 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ INSERT INTO `event_source_class` (`name`,`api_params`,`required_params`,`transform`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES ('acs.mns','{\n \"RegionId\":{\n \"type\":\"String\",\n \"desc\":\"the region of aliyun mns.\",\n \"required\":true,\n \"defaultValue\":\"\"\n },\n \"QueueName\":{\n \"type\":\"String\",\n \"desc\":\"the queue name of aliyun mns.\",\n \"required\":true,\n \"defaultValue\":\"\"\n [...] diff --git a/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql b/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql index 31291a1..1d8a643 100644 --- a/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql +++ b/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql @@ -1,17 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ INSERT INTO `event_target_class` (`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES diff --git a/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql index e4c20c1..b7587a0 100644 --- a/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql +++ b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql @@ -1,17 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ INSERT INTO `event_target_class` (`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES diff --git a/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql b/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql index 82be349..e98bbf0 100644 --- a/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql +++ b/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql @@ -1,18 +1,19 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -ALTER TABLE `event_connection` -MODIFY COLUMN `authorization_type` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '授权类型' AFTER `name`, -MODIFY COLUMN `auth_parameters` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL AFTER `authorization_type`; \ No newline at end of file +--ALTER TABLE `event_connection` MODIFY COLUMN `authorization_type` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '授权类型' AFTER `name`; +--ALTER TABLE `event_connection` MODIFY COLUMN `auth_parameters` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL AFTER `authorization_type`; \ No newline at end of file diff --git a/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql b/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql index f25bfb0..2e9098e 100644 --- a/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql +++ b/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql @@ -1,22 +1,22 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -ALTER TABLE `event_connection` -DROP INDEX `name_uniq_key`, -ADD UNIQUE INDEX `name_uniq_key`(`name`, `account_id`) USING BTREE; +ALTER TABLE `event_connection` DROP INDEX `name_uniq_key_event_connection`; +ALTER TABLE `event_connection` ADD UNIQUE INDEX `name_uniq_key_event_connection`(`name`, `account_id`) USING BTREE; -ALTER TABLE `event_api_destination` -DROP INDEX `name_uniq_key`, -ADD UNIQUE INDEX `name_uniq_key`(`name`, `account_id`) USING BTREE; \ No newline at end of file +ALTER TABLE `event_api_destination` DROP INDEX `name_uniq_key_event_api_destination`; +ALTER TABLE `event_api_destination` ADD UNIQUE INDEX `name_uniq_key_event_api_destination`(`name`, `account_id`) USING BTREE; \ No newline at end of file diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java index 4e6f64e..320c69e 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl; import com.google.gson.Gson; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO; @@ -92,7 +93,10 @@ public class RocketMQEventDataRepository implements EventDataRepository { @Cacheable(value = "topicCache") @Override public String getTopicName(String accountId, String eventBusName) { - String topicName = null; + String topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName); + if (StringUtils.isBlank(AppConfig.getGlobalConfig().getDefaultDataPersistentClusterName())) { + return topicName; + } EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName); if (eventTopicDO != null) { topicName = eventTopicDO.getName(); diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java new file mode 100644 index 0000000..dfd1d30 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.enums.props; + +/** + * Constants + */ +public enum Constants { + HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID("resourceOwnerAccountId"), + HEADER_KEY_LOGIN_ACCOUNT_ID("loginAccountId"), + HEADER_KEY_PARENT_ACCOUNT_ID("parentAccountId"), + ; + + private String name; + + Constants(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java new file mode 100644 index 0000000..37f7b3f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.exception; + + +/** + * Service provider not found exception. + * + */ +public final class ServiceProviderNotFoundException extends EventBridgeException { + + private static final long serialVersionUID = -3730257541332863236L; + + private static final String ERROR_CATEGORY = "SPI"; + + private static final int ERROR_CODE = 500; + + public ServiceProviderNotFoundException(final Class<?> clazz, final String type) { + super(String.format("%s-%05d: %s %s", ERROR_CATEGORY, ERROR_CODE, String.format("No implementation class load from SPI `%s`:", clazz.getName()), type)); + } + +} diff --git a/infrastructure/pom.xml b/infrastructure/pom.xml index d419a45..5db59ef 100644 --- a/infrastructure/pom.xml +++ b/infrastructure/pom.xml @@ -26,4 +26,15 @@ <maven.compiler.target>8</maven.compiler.target> </properties> + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-eventbridge-common</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-webflux</artifactId> + </dependency> + </dependencies> + </project> \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java new file mode 100644 index 0000000..c2a86a1 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate; + + +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ServiceLifecycle; +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed.TypedSPI; +import org.springframework.http.server.reactive.ServerHttpRequest; +import reactor.util.context.Context; + +/** + * AuthValidation + */ +public interface AuthValidation extends ServiceLifecycle, TypedSPI { + + Context validate(ServerHttpRequest request, Context ctx); + + @Override + default void init() { + + } + + @Override + default void shutdown(){ + + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java new file mode 100644 index 0000000..f85517d --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate; + +import org.springframework.http.server.reactive.ServerHttpRequest; +import reactor.util.context.Context; + +import java.util.List; + +import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID; + +/** + * DefaultAuthValidation + */ +public class DefaultAuthValidation implements AuthValidation { + + @Override + public Context validate(ServerHttpRequest request, Context ctx) { + String resourceOwnerId = "defaultResourceOwnerId"; + List<String> resourceOwnerIds = request.getHeaders().get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID); + if (resourceOwnerIds != null && !resourceOwnerIds.isEmpty()) { + //throw new EventBridgeException(DefaultErrorCode.LoginFailed); + resourceOwnerId = resourceOwnerIds.get(0); + } + return ctx.put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID.getName(), resourceOwnerId); + } + + @Override + public String getType() { + return "default"; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java new file mode 100644 index 0000000..517264c --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi; + +/** + * ServiceLifecycle + * + */ +public interface ServiceLifecycle { + /** + * Used for startup or initialization of a service endpoint. A service endpoint instance will be in a ready state + * after this method has been completed. + */ + void init(); + + /** + * Notify a service instance of the end of its life cycle. Once this method completes, the service endpoint could be + * destroyed and eligible for garbage collection. + */ + void shutdown(); +} \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java new file mode 100644 index 0000000..e467e9e --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation; +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed.TypedSPIRegistry; + +/** + * validation service load factory. + * + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ValidationServiceFactory { + static { + ValidationServiceLoader.register(AuthValidation.class); + } + + /** + * Get instance of cluster persist repository. + * + * @param type persist repository configuration + * @return got instance + */ + public static AuthValidation getInstance(final String type) { + AuthValidation result = TypedSPIRegistry.getRegisteredService(AuthValidation.class, type); + result.init(); + return result; + } + +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java new file mode 100644 index 0000000..a2c220e --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.annotation.SingletonSPI; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; + +/** + * validation service loader. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ValidationServiceLoader { + + private static final Map<Class<?>, Collection<Object>> SERVICES = new ConcurrentHashMap<>(); + + /** + * Register service. + * + * @param serviceInterface service interface + */ + public static void register(final Class<?> serviceInterface) { + if (!SERVICES.containsKey(serviceInterface)) { + SERVICES.put(serviceInterface, load(serviceInterface)); + } + } + + private static <T> Collection<Object> load(final Class<T> serviceInterface) { + Collection<Object> result = new LinkedList<>(); + for (T each : ServiceLoader.load(serviceInterface)) { + result.add(each); + } + return result; + } + + /** + * Get service instances. + * + * @param serviceInterface service interface + * @param <T> type of service + * @return service instances + */ + public static <T> Collection<T> getServiceInstances(final Class<T> serviceInterface) { + return null == serviceInterface.getAnnotation(SingletonSPI.class) ? createNewServiceInstances(serviceInterface) : getSingletonServiceInstances(serviceInterface); + } + + @SneakyThrows(ReflectiveOperationException.class) + @SuppressWarnings("unchecked") + private static <T> Collection<T> createNewServiceInstances(final Class<T> serviceInterface) { + if (!SERVICES.containsKey(serviceInterface)) { + return Collections.emptyList(); + } + Collection<Object> services = SERVICES.get(serviceInterface); + if (services.isEmpty()) { + return Collections.emptyList(); + } + Collection<T> result = new LinkedList<>(); + for (Object each : services) { + result.add((T) each.getClass().getDeclaredConstructor().newInstance()); + } + return result; + } + + @SuppressWarnings("unchecked") + private static <T> Collection<T> getSingletonServiceInstances(final Class<T> serviceInterface) { + return (Collection<T>) SERVICES.getOrDefault(serviceInterface, Collections.emptyList()); + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java new file mode 100644 index 0000000..cd61305 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation of singleton SPI. + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface SingletonSPI { +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java new file mode 100644 index 0000000..65a0cbb --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed; + +import java.util.Collection; +import java.util.Collections; + +/** + * Typed SPI. + */ +public interface TypedSPI { + + /** + * Get type. + * + * @return type + */ + default String getType() { + return ""; + } + + /** + * Get type aliases. + * + * @return type aliases + */ + default Collection<String> getTypeAliases() { + return Collections.emptyList(); + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java new file mode 100644 index 0000000..104da6d --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.rocketmq.eventbridge.exception.ServiceProviderNotFoundException; +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ValidationServiceLoader; + +import java.util.Optional; +import java.util.Properties; + +/** + * Typed SPI registry. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class TypedSPIRegistry { + + /** + * Find registered service. + * + * @param spiClass typed SPI class + * @param type type + * @param <T> SPI class type + * @return registered service + */ + public static <T extends TypedSPI> Optional<T> findRegisteredService(final Class<T> spiClass, final String type) { + for (T each : ValidationServiceLoader.getServiceInstances(spiClass)) { + if (matchesType(type, each)) { + return Optional.of(each); + } + } + return Optional.empty(); + } + + private static boolean matchesType(final String type, final TypedSPI instance) { + return instance.getType().equalsIgnoreCase(type) || instance.getTypeAliases().contains(type); + } + + private static Properties convertToStringTypedProperties(final Properties props) { + if (null == props) { + return new Properties(); + } + Properties result = new Properties(); + props.forEach((key, value) -> result.setProperty(key.toString(), null == value ? null : value.toString())); + return result; + } + + /** + * Get registered service. + * + * @param spiClass typed SPI class + * @param type type + * @param <T> SPI class type + * @return registered service + */ + public static <T extends TypedSPI> T getRegisteredService(final Class<T> spiClass, final String type) { + Optional<T> result = findRegisteredService(spiClass, type); + if (result.isPresent()) { + return result.get(); + } + throw new ServiceProviderNotFoundException(spiClass, type); + } +} diff --git a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql b/infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation similarity index 87% copy from adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql copy to infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation index edaf379..205672f 100644 --- a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql +++ b/infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,6 +13,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -ALTER TABLE `event_target_class` -CHANGE COLUMN `target_transform` `target_transform` TEXT NULL DEFAULT NULL ; +# +org.apache.rocketmq.eventbridge.infrastructure.validate.DefaultAuthValidation \ No newline at end of file diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java b/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java new file mode 100644 index 0000000..e7c1990 --- /dev/null +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.config; + +import org.h2.tools.Server; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; + +@Configuration +@Profile("local") +public class H2ServerConfig { + private Server webServer; + + + @EventListener(ContextRefreshedEvent.class) + public void start() throws java.sql.SQLException { + this.webServer = org.h2.tools.Server.createWebServer("-webPort", "8083", "-web", "-webAllowOthers", "-tcp", "-tcpAllowOthers", "-browser").start(); + } + + @EventListener(ContextClosedEvent.class) + public void stop() { + this.webServer.stop(); + } +} diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java index af7cb04..5afafae 100644 --- a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java @@ -18,8 +18,6 @@ package org.apache.rocketmq.eventbridge.filter; import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.eventbridge.exception.EventBridgeException; -import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; import org.springframework.core.annotation.Order; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; @@ -28,35 +26,27 @@ import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; +import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_LOGIN_ACCOUNT_ID; +import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_PARENT_ACCOUNT_ID; + @Component @Order(value = 2) @Slf4j public class LoginFilter implements WebFilter { - public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId"; - public static final String HEADER_KEY_PARENT_ACCOUNT_ID = "parentAccountId"; - public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID = "resourceOwnerAccountId"; - @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); return chain.filter(exchange) .subscriberContext(ctx -> { List<String> parentAccountIds = request.getHeaders() - .get(HEADER_KEY_PARENT_ACCOUNT_ID); + .get(HEADER_KEY_PARENT_ACCOUNT_ID.getName()); List<String> loginAccountIds = request.getHeaders() - .get(HEADER_KEY_LOGIN_ACCOUNT_ID); - List<String> resourceOwnerIds = request.getHeaders() - .get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID); - if (resourceOwnerIds == null || resourceOwnerIds.isEmpty()) { - throw new EventBridgeException(DefaultErrorCode.LoginFailed); - } - return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID, + .get(HEADER_KEY_LOGIN_ACCOUNT_ID.getName()); + return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID.getName(), parentAccountIds != null && !parentAccountIds.isEmpty() ? parentAccountIds.get(0) : "") - .put(HEADER_KEY_LOGIN_ACCOUNT_ID, - loginAccountIds != null && !loginAccountIds.isEmpty() ? loginAccountIds.get(0) : "") - .put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID, - resourceOwnerIds != null && !resourceOwnerIds.isEmpty() ? resourceOwnerIds.get(0) : ""); + .put(HEADER_KEY_LOGIN_ACCOUNT_ID.getName(), + loginAccountIds != null && !loginAccountIds.isEmpty() ? loginAccountIds.get(0) : ""); }); } } diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java new file mode 100644 index 0000000..5d80328 --- /dev/null +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.filter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation; +import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ValidationServiceFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.stereotype.Component; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.WebFilter; +import org.springframework.web.server.WebFilterChain; +import reactor.core.publisher.Mono; +import reactor.util.context.Context; + +import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; + +@Component +@Order(value = 3) +@Slf4j +public class ValidateFilter implements WebFilter { + + private List<AuthValidation> validations = new CopyOnWriteArrayList<>(); + + @Value(value="${auth.validation:default}") + private String validationName; + + @PostConstruct + public void init() { + Arrays.stream(validationName.split(",")).forEach(action->validations.add(ValidationServiceFactory.getInstance(action))); + } + + @Override + public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { + ServerHttpRequest request = exchange.getRequest(); + return chain.filter(exchange) + .subscriberContext(ctx -> { + AtomicReference<Context> result = new AtomicReference<Context>(); + validations.forEach(validation-> result.set(validation.validate(request, ctx))); + return result.get(); + }); + } +} diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 185ba48..3736814 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -16,12 +16,12 @@ server.port=7001 management.server.port=7002 management.endpoints.web.base-path=/ +spring.profiles.active=local ## database -spring.datasource.url=jdbc:mysql://localhost:3306/eventBridge?useUnicode=true&characterEncoding=utf8&useSSL=false -spring.datasource.driver-class-name=com.mysql.jdbc.Driver -spring.datasource.username=xxxxx -spring.datasource.password=xxxxx -mybatis.mapper-locations=classpath:mybatis/*.xml +#spring.datasource.hikari.jdbc-url=jdbc:mysql://localhost:3306/eventBridge?useUnicode=true&characterEncoding=utf8&useSSL=false +#spring.datasource.hikari.driver-class-name=com.mysql.jdbc.Driver +#spring.datasource.hikari.username=xxxxx +#spring.datasource.hikari.password=xxxxx mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl ## flyway spring.flyway.placeholderReplacement=false
