joeCarf commented on code in PR #76:
URL: 
https://github.com/apache/rocketmq-schema-registry/pull/76#discussion_r1103598574


##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final String DEFAULT_TENANT = "default";
+    private static final String DEFAULT_CLUSTER = "default";
+    private static final int DEFAULT_CAPACITY = 100;
+    private static final int DEFAULT_DURATION = 10;
+
+    private final RestService restService;
+
+    private final Map<String, Set<Long>> subjectToId; // restore recordIds 
that cached in SubjectAndId
+
+    private final Map<String, Set<Long>> subjectToVersion; // restore versions 
that cached in SubjectAndVersion
+
+    private final Map<String, Set<String>> subjectToSchema; // restore schema 
that cached in SubjectAndSchema
+
+    private final Cache<SubjectAndVersion, GetSchemaResponse> 
schemaCacheBySubjectAndVersion;
+
+    private final Cache<String, List<String>> subjectCache; //cache for subject
+
+    private final Cache<SubjectAndId, GetSchemaResponse> 
schemaCacheBySubjectAndId;
+
+    private final Cache<String, GetSchemaResponse> schemaCacheBySubject; 
//schema cache by Subject only
+
+    private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; 
//schema cache by SubjectAndSchema
+
+    private final Cache<String, List<String>> tenantCache;
+
+    public CachedSchemaRegistryClient(RestService restService) {
+        this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, 
DEFAULT_DURATION);
+    }
+
+    public CachedSchemaRegistryClient(RestService restService, int capacity, 
TimeUnit unit, int duration) {
+        this.restService = restService;
+        subjectToId = new HashMap<>();
+        subjectToVersion = new HashMap<>();
+        subjectToSchema = new HashMap<>();
+        this.subjectCache = 
CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, 
unit).build();
+        this.schemaCache = 
CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, 
unit).build();
+        this.tenantCache = 
CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build();
+        this.schemaCacheBySubjectAndVersion = 
CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, 
unit).build();
+        this.schemaCacheBySubjectAndId = 
CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, 
unit).build();
+        this.schemaCacheBySubject = 
CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, 
unit).build();
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String subject, String 
schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException 
{
+        return restService.registerSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String clusterName, String 
tenant, String subjectName,
+        String schemaName, RegisterSchemaRequest request) throws IOException, 
RestClientException {
+        return restService.registerSchema(clusterName, tenant, subjectName, 
schemaName, request);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
+        String subject) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, 
subject);
+
+        schemaCacheBySubject.invalidate(subjectFullName);
+        //invalidate schemaCacheBySubjectAndVersion
+        if (subjectToVersion.get(subjectFullName) != null) {
+            subjectToVersion.get(subjectFullName).forEach(
+                version -> schemaCacheBySubjectAndVersion.invalidate(new 
SubjectAndVersion(cluster, tenant, subject, version)));
+        }
+        //invalidate schemaCacheBySubjectAndId
+        if (subjectToId.get(subjectFullName) != null) {
+            subjectToId.get(subjectFullName).forEach(
+                recordId -> schemaCacheBySubjectAndId.invalidate(new 
SubjectAndId(cluster, tenant, subject, recordId)));
+        }
+        // invalidate schemaCache
+        if (subjectToSchema.get(subjectFullName) != null) {
+            subjectToSchema.get(subjectFullName).forEach(
+                schema -> schemaCache.invalidate(new SubjectAndSchema(cluster, 
tenant, subject, schema)));
+        }
+        subjectToVersion.remove(subjectFullName);
+        subjectToId.remove(subjectFullName);
+        subjectToSchema.remove(subjectFullName);
+        return restService.deleteSchema(cluster, tenant, subject);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant, 
String subject,
+        long version) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, 
subject);
+        schemaCacheBySubject.invalidate(subjectFullName);
+        schemaCacheBySubjectAndVersion.invalidate(new 
SubjectAndVersion(cluster, tenant, subject, version));
+        subjectToVersion.get(subjectFullName).remove(version);
+
+        return restService.deleteSchema(cluster, tenant, subject, version);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws RestClientException, IOException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(DEFAULT_CLUSTER, 
DEFAULT_TENANT, subject, schemaName));
+        return restService.updateSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String cluster, String tenant, 
String subjectName,
+        String schemaName, UpdateSchemaRequest request) throws IOException, 
RestClientException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, 
subjectName, schemaName));
+        return restService.updateSchema(cluster, tenant, subjectName, 
schemaName, request);
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException {
+        String fullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, 
DEFAULT_TENANT, subject);
+        GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getSchemaBySubject(subject);
+        schemaCacheBySubject.put(fullName, result);

Review Comment:
   I'm sorry, I didn't understand this. Is this code used in the initializing 
process? Could you show me how to use this code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to