This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new ae0cff6  add get subjects by tenant interface to support flink list 
table by subjects
     new 2683b13  Merge pull request #33 from humkum/main
ae0cff6 is described below

commit ae0cff6941359139853b1f52575201bf88d399c5
Author: Hankunming <[email protected]>
AuthorDate: Thu Aug 11 16:23:24 2022 +0800

    add get subjects by tenant interface to support flink list table by subjects
---
 .../client/NormalSchemaRegistryClient.java         |  6 ++++++
 .../registry/client/SchemaRegistryClient.java      |  2 ++
 .../schema/registry/client/rest/RestService.java   | 13 ++++++++++++-
 .../registry/common/storage/StorageService.java    |  4 ++++
 .../common/storage/StorageServiceProxy.java        |  8 ++++++++
 .../registry/core/api/v1/SchemaController.java     | 22 ++++++++++++++++++++++
 .../registry/core/service/SchemaService.java       |  2 ++
 .../registry/core/service/SchemaServiceImpl.java   | 12 ++++++++++++
 .../schema/registry/example/DeleteSchemaDemo.java  |  2 +-
 .../schema/registry/example/GetSchemaDemo.java     |  2 +-
 .../registry/example/RegisterSchemaDemo.java       |  2 +-
 .../schema/registry/example/UpdateSchemaDemo.java  |  2 +-
 .../registry/storage/rocketmq/RocketmqClient.java  | 22 ++++++++++++++++++++++
 .../storage/rocketmq/RocketmqStorageClient.java    |  3 +++
 .../rocketmq/RocketmqStorageClientImpl.java        |  5 +++++
 .../storage/rocketmq/RocketmqStorageService.java   |  4 ++++
 16 files changed, 106 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index b70295e..9276937 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -96,4 +96,10 @@ public class NormalSchemaRegistryClient implements 
SchemaRegistryClient {
         return restService.getSchemaListBySubject(cluster, tenant, subject);
     }
 
+    @Override
+    public List<String> getSubjectsByTenant(String cluster, String tenant)
+            throws RestClientException, IOException {
+        return restService.getSubjectsByTenant(cluster, tenant);
+    }
+
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index 23b60b1..24a3f65 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -58,4 +58,6 @@ public interface SchemaRegistryClient {
     List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
         String subject) throws RestClientException, IOException;
 
+    List<String> getSubjectsByTenant(String cluster, String tenant) throws 
RestClientException, IOException;
+
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index f0138f5..40d01d4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -34,6 +34,8 @@ import 
org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
 import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
 
 public class RestService {
+    private static final String API_PREFIX = "/schema-registry/v1";
+
     private static final TypeReference<RegisterSchemaResponse> 
REGISTER_SCHEMA_DTO_TYPE_REFERENCE =
         new TypeReference<RegisterSchemaResponse>() { };
 
@@ -48,6 +50,9 @@ public class RestService {
     private static final TypeReference<List<SchemaRecordDto>> 
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE =
         new TypeReference<List<SchemaRecordDto>>() { };
 
+    private static final TypeReference<List<String>> GET_SUBJECTS_REFERENCE =
+        new TypeReference<List<String>>() { };
+
     public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
 
     private static final String HTTP_GET = "GET";
@@ -59,7 +64,7 @@ public class RestService {
     private final Map<String, String> httpHeaders;
 
     public RestService(String baseUri) {
-        this.baseUri = baseUri;
+        this.baseUri = baseUri + API_PREFIX;
         httpHeaders = new HashMap<>();
         httpHeaders.put("Content-Type", "application/json");
     }
@@ -141,4 +146,10 @@ public class RestService {
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject).toString());
         return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE);
     }
+
+    public List<String> getSubjectsByTenant(String cluster, String tenant) 
throws RestClientException, IOException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subjects");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SUBJECTS_REFERENCE);
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 54de019..173d0a7 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -83,4 +83,8 @@ public interface StorageService<T extends BaseInfo> {
     default List<SchemaRecordInfo> listBySubject(final StorageServiceContext 
context, final QualifiedName name) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
+
+    default List<String> listSubjectsByTenant(StorageServiceContext 
storageServiceContext, QualifiedName name) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index d4439e3..77d9583 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -113,4 +113,12 @@ public class StorageServiceProxy {
 
         return storageService.listBySubject(storageServiceContext, name);
     }
+
+    public List<String> listSubjectsByTenant(final QualifiedName name) {
+        final RequestContext requestContext = 
RequestContextManager.getContext();
+        final StorageServiceContext storageServiceContext = 
storageUtil.convertToStorageServiceContext(requestContext);
+        final StorageService<SchemaInfo> storageService = 
storageManager.getStorageService();
+
+        return storageService.listSubjectsByTenant(storageServiceContext, 
name);
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index c04fcf7..43db0c2 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -445,4 +445,26 @@ public class SchemaController {
             () -> schemaService.listBySubject(name)
         );
     }
+
+    @RequestMapping(
+            method = RequestMethod.GET,
+            path = "/cluster/{cluster-name}/tenant/{tenant-name}/subjects"
+        )
+    @ApiOperation(
+            value = "subjects information",
+            notes = "all subjects from a given tenant"
+        )
+    public List<String> getSubjectListByTenant(
+            @ApiParam(value = "The cluster of the subjects", required = true)
+            @PathVariable(value = "cluster-name") final String cluster,
+            @ApiParam(value = "The tenant subjects belong to", required = true)
+            @PathVariable(value = "tenant-name") final String tenant
+    ) {
+        QualifiedName name = new QualifiedName(cluster, tenant, null, null);
+
+        return this.requestProcessor.processRequest(
+                "getSubjectListByTenant",
+            () -> schemaService.listSubjectsByTenant(name)
+        );
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index 2d67437..892232c 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -79,4 +79,6 @@ public interface SchemaService<T extends BaseDto> {
      * @return schema object with the schemaName
      */
     List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName);
+
+    List<String> listSubjectsByTenant(QualifiedName qualifiedName);
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 2c2dca5..c172ab0 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -267,6 +267,18 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
         return 
recordInfos.stream().map(storageUtil::convertToSchemaRecordDto).collect(Collectors.toList());
     }
 
+    public List<String> listSubjectsByTenant(QualifiedName qualifiedName) {
+        final RequestContext requestContext = 
RequestContextManager.getContext();
+        log.info("get request context: " + requestContext);
+
+        this.accessController.checkPermission("", qualifiedName.getTenant(), 
SchemaOperation.GET);
+
+        List<String> subjects = 
storageServiceProxy.listSubjectsByTenant(qualifiedName);
+
+        log.info("list subjects by tenant: {}", qualifiedName.getTenant());
+        return subjects;
+    }
+
     private void checkSchemaExist(final QualifiedName qualifiedName) {
         if (storageServiceProxy.get(qualifiedName) != null) {
             throw new SchemaExistException(qualifiedName);
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
index 4fcb440..12510c6 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/DeleteSchemaDemo.java
@@ -28,7 +28,7 @@ public class DeleteSchemaDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         String topic = "TopicTest";
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
index 7008ad3..7bc2975 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
@@ -28,7 +28,7 @@ public class GetSchemaDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         String topic = "TopicTest";
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
index 6adfdfc..fc335c0 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/RegisterSchemaDemo.java
@@ -31,7 +31,7 @@ public class RegisterSchemaDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         String topic = "TopicTest";
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
index ded7ce9..9d237e5 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/UpdateSchemaDemo.java
@@ -29,7 +29,7 @@ public class UpdateSchemaDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         String topic = "TopicTest";
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index e13f2af..fcd27fb 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import 
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
 import 
org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
 import 
org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
@@ -63,6 +64,7 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
 
 import static 
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.DELETE_KEYS;
 import static 
org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_LOCAL_CACHE_PATH;
@@ -427,6 +429,26 @@ public class RocketmqClient {
         return result == null ? null : converter.fromJson(result, 
SchemaInfo.class);
     }
 
+    public List<String> getSubjects(StorageServiceContext context, String 
tenant) {
+        List<String> subjects = new ArrayList<>();
+        RocksIterator iterator = cache.newIterator(subjectCfHandle());
+        for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+            String subjectFullName = new String(iterator.key());
+            String[] subjectFromCache = subjectFullName.split("/");
+            String tenantFromKey = subjectFromCache[1];
+            String subjectFromKey = subjectFromCache[2];
+            if (isSuperAdmin(context.getUserName()) || 
tenant.equals(tenantFromKey)) {
+                subjects.add(subjectFromKey);
+            }
+        }
+        return subjects;
+    }
+
+    private boolean isSuperAdmin(String userName) {
+        // check superAdmin
+        return false;
+    }
+
     private void init(Properties props) {
         this.useCompactTopic = 
Boolean.parseBoolean(props.getProperty(STORAGE_ROCKETMQ_USE_COMPACT_TOPIC,
             STORAGE_ROCKETMQ_USE_COMPACT_TOPIC_DEFAULT));
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 848258d..1aebedf 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq;
 
 import java.util.List;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import 
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 
@@ -82,4 +83,6 @@ public interface RocketmqStorageClient {
     default List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
+
+    List<String> listSubjectsByTenant(StorageServiceContext context, 
QualifiedName name);
 }
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 9d3cdba..78b5f55 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import 
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
@@ -120,4 +121,8 @@ public class RocketmqStorageClientImpl implements 
RocketmqStorageClient {
         }
         return schemaInfo.getDetails().getSchemaRecords();
     }
+
+    public List<String> listSubjectsByTenant(StorageServiceContext context, 
QualifiedName qualifiedName) {
+        return rocketmqClient.getSubjects(context, qualifiedName.getTenant());
+    }
 }
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index f0c66f6..817f257 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -83,4 +83,8 @@ public class RocketmqStorageService implements 
StorageService<SchemaInfo> {
     public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, 
QualifiedName name) {
         return storageClient.listBySubject(name);
     }
+
+    public List<String> listSubjectsByTenant(StorageServiceContext context, 
QualifiedName name) {
+        return storageClient.listSubjectsByTenant(context, name);
+    }
 }

Reply via email to