This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new ae0cff6 add get subjects by tenant interface to support flink list
table by subjects
new 2683b13 Merge pull request #33 from humkum/main
ae0cff6 is described below
commit ae0cff6941359139853b1f52575201bf88d399c5
Author: Hankunming <[email protected]>
AuthorDate: Thu Aug 11 16:23:24 2022 +0800
add get subjects by tenant interface to support flink list table by subjects
---
.../client/NormalSchemaRegistryClient.java | 6 ++++++
.../registry/client/SchemaRegistryClient.java | 2 ++
.../schema/registry/client/rest/RestService.java | 13 ++++++++++++-
.../registry/common/storage/StorageService.java | 4 ++++
.../common/storage/StorageServiceProxy.java | 8 ++++++++
.../registry/core/api/v1/SchemaController.java | 22 ++++++++++++++++++++++
.../registry/core/service/SchemaService.java | 2 ++
.../registry/core/service/SchemaServiceImpl.java | 12 ++++++++++++
.../schema/registry/example/DeleteSchemaDemo.java | 2 +-
.../schema/registry/example/GetSchemaDemo.java | 2 +-
.../registry/example/RegisterSchemaDemo.java | 2 +-
.../schema/registry/example/UpdateSchemaDemo.java | 2 +-
.../registry/storage/rocketmq/RocketmqClient.java | 22 ++++++++++++++++++++++
.../storage/rocketmq/RocketmqStorageClient.java | 3 +++
.../rocketmq/RocketmqStorageClientImpl.java | 5 +++++
.../storage/rocketmq/RocketmqStorageService.java | 4 ++++
16 files changed, 106 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index b70295e..9276937 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -96,4 +96,10 @@ public class NormalSchemaRegistryClient implements
SchemaRegistryClient {
return restService.getSchemaListBySubject(cluster, tenant, subject);
}
+ @Override
+ public List<String> getSubjectsByTenant(String cluster, String tenant)
+ throws RestClientException, IOException {
+ return restService.getSubjectsByTenant(cluster, tenant);
+ }
+
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index 23b60b1..24a3f65 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -58,4 +58,6 @@ public interface SchemaRegistryClient {
List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
String subject) throws RestClientException, IOException;
+ List<String> getSubjectsByTenant(String cluster, String tenant) throws
RestClientException, IOException;
+
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index f0138f5..40d01d4 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -34,6 +34,8 @@ import
org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
public class RestService {
+ private static final String API_PREFIX = "/schema-registry/v1";
+
private static final TypeReference<RegisterSchemaResponse>
REGISTER_SCHEMA_DTO_TYPE_REFERENCE =
new TypeReference<RegisterSchemaResponse>() { };
@@ -48,6 +50,9 @@ public class RestService {
private static final TypeReference<List<SchemaRecordDto>>
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE =
new TypeReference<List<SchemaRecordDto>>() { };
+ private static final TypeReference<List<String>> GET_SUBJECTS_REFERENCE =
+ new TypeReference<List<String>>() { };
+
public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
private static final String HTTP_GET = "GET";
@@ -59,7 +64,7 @@ public class RestService {
private final Map<String, String> httpHeaders;
public RestService(String baseUri) {
- this.baseUri = baseUri;
+ this.baseUri = baseUri + API_PREFIX;
httpHeaders = new HashMap<>();
httpHeaders.put("Content-Type", "application/json");
}
@@ -141,4 +146,10 @@ public class RestService {
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject).toString());
return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE);
}
+
+ public List<String> getSubjectsByTenant(String cluster, String tenant)
throws RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subjects");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SUBJECTS_REFERENCE);
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 54de019..173d0a7 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -83,4 +83,8 @@ public interface StorageService<T extends BaseInfo> {
default List<SchemaRecordInfo> listBySubject(final StorageServiceContext
context, final QualifiedName name) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ default List<String> listSubjectsByTenant(StorageServiceContext
storageServiceContext, QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index d4439e3..77d9583 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -113,4 +113,12 @@ public class StorageServiceProxy {
return storageService.listBySubject(storageServiceContext, name);
}
+
+ public List<String> listSubjectsByTenant(final QualifiedName name) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext =
storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService =
storageManager.getStorageService();
+
+ return storageService.listSubjectsByTenant(storageServiceContext,
name);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index c04fcf7..43db0c2 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -445,4 +445,26 @@ public class SchemaController {
() -> schemaService.listBySubject(name)
);
}
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subjects"
+ )
+ @ApiOperation(
+ value = "subjects information",
+ notes = "all subjects from a given tenant"
+ )
+ public List<String> getSubjectListByTenant(
+ @ApiParam(value = "The cluster of the subjects", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant subjects belong to", required = true)
+ @PathVariable(value = "tenant-name") final String tenant
+ ) {
+ QualifiedName name = new QualifiedName(cluster, tenant, null, null);
+
+ return this.requestProcessor.processRequest(
+ "getSubjectListByTenant",
+ () -> schemaService.listSubjectsByTenant(name)
+ );
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index 2d67437..892232c 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -79,4 +79,6 @@ public interface SchemaService<T extends BaseDto> {
* @return schema object with the schemaName
*/
List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName);
+
+ List<String> listSubjectsByTenant(QualifiedName qualifiedName);
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 2c2dca5..c172ab0 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -267,6 +267,18 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
return
recordInfos.stream().map(storageUtil::convertToSchemaRecordDto).collect(Collectors.toList());
}
+ public List<String> listSubjectsByTenant(QualifiedName qualifiedName) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ log.info("get request context: " + requestContext);
+
+ this.accessController.checkPermission("", qualifiedName.getTenant(),
SchemaOperation.GET);
+
+ List<String> subjects =
storageServiceProxy.listSubjectsByTenant(qualifiedName);
+
+ log.info("list subjects by tenant: {}", qualifiedName.getTenant());
+ return subjects;
+ }
+
private void checkSchemaExist(final QualifiedName qualifiedName) {
if (storageServiceProxy.get(qualifiedName) != null) {
throw new SchemaExistException(qualifiedName);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
index 4fcb440..12510c6 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
@@ -28,7 +28,7 @@ public class DeleteSchemaDemo {
public static void main(String[] args) {
- String baseUrl = "http://localhost:8080/schema-registry/v1";
+ String baseUrl = "http://localhost:8080";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
String topic = "TopicTest";
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
index 7008ad3..7bc2975 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
@@ -28,7 +28,7 @@ public class GetSchemaDemo {
public static void main(String[] args) {
- String baseUrl = "http://localhost:8080/schema-registry/v1";
+ String baseUrl = "http://localhost:8080";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
String topic = "TopicTest";
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
index 6adfdfc..fc335c0 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
@@ -31,7 +31,7 @@ public class RegisterSchemaDemo {
public static void main(String[] args) {
- String baseUrl = "http://localhost:8080/schema-registry/v1";
+ String baseUrl = "http://localhost:8080";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
String topic = "TopicTest";
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
index ded7ce9..9d237e5 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
@@ -29,7 +29,7 @@ public class UpdateSchemaDemo {
public static void main(String[] args) {
- String baseUrl = "http://localhost:8080/schema-registry/v1";
+ String baseUrl = "http://localhost:8080";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
String topic = "TopicTest";
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index e13f2af..fcd27fb 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import
org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
import
org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
@@ -63,6 +64,7 @@ import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
import static
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.DELETE_KEYS;
import static
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_LOCAL_CACHE_PATH;
@@ -427,6 +429,26 @@ public class RocketmqClient {
return result == null ? null : converter.fromJson(result,
SchemaInfo.class);
}
+ public List<String> getSubjects(StorageServiceContext context, String
tenant) {
+ List<String> subjects = new ArrayList<>();
+ RocksIterator iterator = cache.newIterator(subjectCfHandle());
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ String subjectFullName = new String(iterator.key());
+ String[] subjectFromCache = subjectFullName.split("/");
+ String tenantFromKey = subjectFromCache[1];
+ String subjectFromKey = subjectFromCache[2];
+ if (isSuperAdmin(context.getUserName()) ||
tenant.equals(tenantFromKey)) {
+ subjects.add(subjectFromKey);
+ }
+ }
+ return subjects;
+ }
+
+ private boolean isSuperAdmin(String userName) {
+ // check superAdmin
+ return false;
+ }
+
private void init(Properties props) {
this.useCompactTopic =
Boolean.parseBoolean(props.getProperty(STORAGE_ROCKETMQ_USE_COMPACT_TOPIC,
STORAGE_ROCKETMQ_USE_COMPACT_TOPIC_DEFAULT));
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 848258d..1aebedf 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq;
import java.util.List;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
@@ -82,4 +83,6 @@ public interface RocketmqStorageClient {
default List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName name);
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 9d3cdba..78b5f55 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
@@ -120,4 +121,8 @@ public class RocketmqStorageClientImpl implements
RocketmqStorageClient {
}
return schemaInfo.getDetails().getSchemaRecords();
}
+
+ public List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName qualifiedName) {
+ return rocketmqClient.getSubjects(context, qualifiedName.getTenant());
+ }
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index f0c66f6..817f257 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -83,4 +83,8 @@ public class RocketmqStorageService implements
StorageService<SchemaInfo> {
public List<SchemaRecordInfo> listBySubject(StorageServiceContext context,
QualifiedName name) {
return storageClient.listBySubject(name);
}
+
+ public List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName name) {
+ return storageClient.listSubjectsByTenant(context, name);
+ }
}