merlimat commented on a change in pull request #14283:
URL: https://github.com/apache/pulsar/pull/14283#discussion_r806384123
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
##########
@@ -69,4 +69,10 @@
*/
@Builder.Default
private final int batchingMaxSizeKb = 128;
+
+ /**
+ * Metadata store cache executor thread pool size.
+ */
+ @Builder.Default
+ private final int metadataCacheExecutorThreadPoolSize = 10;
Review comment:
What is the reason for having 10 threads?
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +268,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.executeOrdered(path, () -> {
Review comment:
Why do we need to run in a different thread?
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
##########
@@ -159,4 +158,11 @@
* @return the metadata cache object
*/
<T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde);
+
+ /**
+ * Get metadata store config.
+ *
+ * @return the metadata store config.
+ */
+ MetadataStoreConfig getMetadataStoreConfig();
Review comment:
Why exposing the config as part of API here?
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notifiable.java
##########
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface Notifiable<T> {
Review comment:
`Notifiable` doesn't sound correct as a name. Also, if we're always
using this with a `Notification` arg, we don't need this to be a generic type.
eg:
```java
interface NotificationListener {
CompletableFuture<Void> acceptAsync(Notification n);
}
```
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +261,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ this.store.getMetadataCacheScheduler().executeOrdered(path, () -> {
+ if (objCache.getIfPresent(path) != null) {
+ objCache.synchronous().invalidate(path);
+ objCache.synchronous().refresh(path);
+ }
+ promise.complete(null);
+ });
+ return promise;
Review comment:
> I'm not sure if it helps, but in [Caffeine
3.x](https://github.com/ben-manes/caffeine/releases/tag/v3.0.0) the refresh
behavior was improved to be linearizable. It also now returns te in-flight
CompletableFuture. You might consider upgrading.
We're still compiling with compatibility to Java 8, so we're not yet able to
do so, though we plan to raise min Java compile version soon and we'll
certainly upgrade to Caffeine 3.x then.
> I am also not sure why this is invalidating and refreshing. Since the
value is no longer present then the refresh is simply a cache load, and itself
delegates to AsyncCache.get(key), which of course also returns that future. You
might also consider using if (objCache.asMap().remove(key) != null) { ... }
instead of the get/invalidate.
The reason was that in 2.x `refresh()` doesn't return a future so we don't
know when it's completed. Because of that, in order to not return a stale
object from the cache, we trigger the invalidation just before.
> Trying to grok the code, I'd probably use
objCache.asMap().computeIfPresent(key, (k, future) -> ...) to explicitly store
a new future and return that upstream. Then it is all an atomic swap with very
clear code that localizes and shows your intent.
That's a very good point. I think that is probably the best approach (at
least until we get to Caffeine 3.x).
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +268,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.executeOrdered(path, () -> {
Review comment:
@codelipenghui the invalidate is just removing form the local cache map
so it's not blocking. `refresh()` will initiate the refilling of the cache in
background and will not block either.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]