This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 7d8def4 add list tenants rest interface to support flink list
databases (#55)
7d8def4 is described below
commit 7d8def41c828793a88277c3c87bacfbde0f93d2e
Author: Humkum <[email protected]>
AuthorDate: Thu Sep 8 15:35:20 2022 +0800
add list tenants rest interface to support flink list databases (#55)
---
.../client/NormalSchemaRegistryClient.java | 5 +++++
.../registry/client/SchemaRegistryClient.java | 1 +
.../schema/registry/client/rest/RestService.java | 10 ++++++++--
.../registry/common/storage/StorageService.java | 4 ++++
.../common/storage/StorageServiceProxy.java | 8 ++++++++
.../registry/core/api/v1/SchemaController.java | 23 +++++++++++++++++++++-
.../registry/core/service/SchemaService.java | 2 ++
.../registry/core/service/SchemaServiceImpl.java | 12 +++++++++++
.../registry/storage/rocketmq/RocketmqClient.java | 14 ++++++++++++-
.../storage/rocketmq/RocketmqStorageClient.java | 8 +++++++-
.../rocketmq/RocketmqStorageClientImpl.java | 6 ++++++
.../storage/rocketmq/RocketmqStorageService.java | 4 ++++
12 files changed, 92 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index 9276937..1256bb5 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -102,4 +102,9 @@ public class NormalSchemaRegistryClient implements
SchemaRegistryClient {
return restService.getSubjectsByTenant(cluster, tenant);
}
+ @Override
+ public List<String> getAllTenants(String cluster) throws
RestClientException, IOException {
+ return restService.getAllTenants(cluster);
+ }
+
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index 24a3f65..7e02738 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -60,4 +60,5 @@ public interface SchemaRegistryClient {
List<String> getSubjectsByTenant(String cluster, String tenant) throws
RestClientException, IOException;
+ List<String> getAllTenants(String cluster) throws RestClientException,
IOException;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index 40d01d4..704ac77 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -50,7 +50,7 @@ public class RestService {
private static final TypeReference<List<SchemaRecordDto>>
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE =
new TypeReference<List<SchemaRecordDto>>() { };
- private static final TypeReference<List<String>> GET_SUBJECTS_REFERENCE =
+ private static final TypeReference<List<String>> LIST_STRING_REFERENCE =
new TypeReference<List<String>>() { };
public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
@@ -150,6 +150,12 @@ public class RestService {
public List<String> getSubjectsByTenant(String cluster, String tenant)
throws RestClientException, IOException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subjects");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SUBJECTS_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
LIST_STRING_REFERENCE);
+ }
+
+ public List<String> getAllTenants(String cluster) throws
RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenants");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
LIST_STRING_REFERENCE);
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 173d0a7..8dc85a5 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -87,4 +87,8 @@ public interface StorageService<T extends BaseInfo> {
default List<String> listSubjectsByTenant(StorageServiceContext
storageServiceContext, QualifiedName name) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ default List<String> listTenants(StorageServiceContext storageService,
QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index 77d9583..8f0457f 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -121,4 +121,12 @@ public class StorageServiceProxy {
return storageService.listSubjectsByTenant(storageServiceContext,
name);
}
+
+ public List<String> listTenants(final QualifiedName name) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext =
storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService =
storageManager.getStorageService();
+
+ return storageService.listTenants(storageServiceContext, name);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index 43db0c2..0ad7d60 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -463,8 +463,29 @@ public class SchemaController {
QualifiedName name = new QualifiedName(cluster, tenant, null, null);
return this.requestProcessor.processRequest(
- "getSubjectListByTenant",
+ "getSubjectListByTenant",
() -> schemaService.listSubjectsByTenant(name)
);
}
+
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/tenants"
+ )
+ @ApiOperation(
+ value = "tenants list",
+ notes = "all tenant from cluster"
+ )
+ public List<String> getTenants(
+ @ApiParam(value = "The cluster of the tenant", required = true)
+ @PathVariable(value = "cluster-name") final String cluster
+ ) {
+ QualifiedName name = new QualifiedName(cluster, null, null, null);
+
+ return this.requestProcessor.processRequest(
+ "getTenants",
+ () -> schemaService.listTenants(name)
+ );
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index 892232c..d7af10b 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -81,4 +81,6 @@ public interface SchemaService<T extends BaseDto> {
List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName);
List<String> listSubjectsByTenant(QualifiedName qualifiedName);
+
+ List<String> listTenants(QualifiedName name);
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index f169a84..2872f28 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -287,6 +287,18 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
return subjects;
}
+ @Override
+ public List<String> listTenants(QualifiedName qualifiedName) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ log.info("get request context: " + requestContext);
+
+ this.accessController.checkPermission("", qualifiedName.getCluster(),
SchemaOperation.GET);
+
+ List<String> tenants = storageServiceProxy.listTenants(qualifiedName);
+ log.info("list all tenants: {}", qualifiedName.getCluster());
+ return tenants;
+ }
+
private void checkSchemaExist(final QualifiedName qualifiedName) {
if (storageServiceProxy.get(qualifiedName) != null) {
throw new SchemaExistException(qualifiedName);
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 452e637..bbc22ed 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
import
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import
org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
@@ -417,7 +418,7 @@ public class RocketmqClient {
RocksIterator iterator = cache.newIterator(subjectCfHandle());
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
String subjectFullName = new String(iterator.key());
- String[] subjectFromCache = subjectFullName.split("/");
+ String[] subjectFromCache =
subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR));
String tenantFromKey = subjectFromCache[1];
String subjectFromKey = subjectFromCache[2];
if (isSuperAdmin(context.getUserName()) ||
tenant.equals(tenantFromKey)) {
@@ -427,6 +428,17 @@ public class RocketmqClient {
return subjects;
}
+ public List<String> getTenants(String cluster) {
+ List<String> tenants = new ArrayList<>();
+ RocksIterator iterator = cache.newIterator(subjectCfHandle());
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ String subjectFullName = new String(iterator.key());
+ String tenant =
subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR))[1];
+ tenants.add(tenant);
+ }
+ return tenants;
+ }
+
private boolean isSuperAdmin(String userName) {
// check superAdmin
return false;
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 1aebedf..0099025 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -84,5 +84,11 @@ public interface RocketmqStorageClient {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
- List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName name);
+ default List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ default List<String> listTenant(QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 78b5f55..0ffee01 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -122,7 +122,13 @@ public class RocketmqStorageClientImpl implements
RocketmqStorageClient {
return schemaInfo.getDetails().getSchemaRecords();
}
+ @Override
public List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName qualifiedName) {
return rocketmqClient.getSubjects(context, qualifiedName.getTenant());
}
+
+ @Override
+ public List<String> listTenant(QualifiedName qualifiedName) {
+ return rocketmqClient.getTenants(qualifiedName.getCluster());
+ }
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index 817f257..1c91b89 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -87,4 +87,8 @@ public class RocketmqStorageService implements
StorageService<SchemaInfo> {
public List<String> listSubjectsByTenant(StorageServiceContext context,
QualifiedName name) {
return storageClient.listSubjectsByTenant(context, name);
}
+
+ public List<String> listTenants(StorageServiceContext storageService,
QualifiedName name) {
+ return storageClient.listTenant(name);
+ }
}