codelipenghui commented on a change in pull request #14283:
URL: https://github.com/apache/pulsar/pull/14283#discussion_r806437028
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +268,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.executeOrdered(path, () -> {
Review comment:
From the logs, we can see the thread name is `metadata-store-2437-1` or
`main` the caller thread. The above lines are calling sync methods and the
thread will be blocked until the refresh done, I have tested this case with
@Demogorgon314 yesterday, just to simulate the metadata get operation future
complete with 10 seconds delay, the `metadata-store-2437-1` or `main` will be
blocked 10 seconds.
```java
if (objCache.getIfPresent(path) != null) {
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
}
```
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
##########
@@ -69,4 +69,10 @@
*/
@Builder.Default
private final int batchingMaxSizeKb = 128;
+
+ /**
+ * Metadata store cache executor thread pool size.
+ */
+ @Builder.Default
+ private final int metadataCacheExecutorThreadPoolSize = 10;
Review comment:
Maybe we can keep consistent with CPU core size by default?
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +268,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.executeOrdered(path, () -> {
Review comment:
```diff
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -397,7 +397,16 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
metaValue.getModifiedTimestamp(),
metaValue.ephemeral,
metaValue.getOwner() == instanceId));
- return CompletableFuture.completedFuture(Optional.of(result));
+ CompletableFuture<Optional<GetResult>> future = new
CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ future.complete(Optional.of(result));
+ }).start();
+ return future;
} catch (Throwable e) {
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index a6292e0b3ab..4414ca7061c 100644
```
Here is the result:
```
2022-02-15T15:19:48,482+0800 WARN [main] o.a.z.s.ServerCnxnFactory@309 -
maxCnxns is not configured, using default value 0. {}
2022-02-15T15:19:48,558+0800 INFO [main] o.a.p.m.TestZKServer@74 - Started
test ZK server on port 49310 {}
2022-02-15T15:19:48,578+0800 INFO [main] o.a.p.m.TestZKServer@180 - ZK
Server UP {}
2022-02-15T15:19:49,440+0800 INFO [main] o.a.p.m.i.RocksdbMetadataStore@244
- new
RocksdbMetadataStore,url=org.apache.pulsar.metadata.api.MetadataStoreConfig@49bd54f7,instanceId=1
{}
2022-02-15T15:19:49,722+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@271
- 1. Refresh path /key-83830190436902 {}
2022-02-15T15:19:49,723+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@282
- Refresh triggered for /key-83830190436902 {}
2022-02-15T15:19:49,723+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@278 - 0. Skip Refresh path /key-83830190436902 {}
2022-02-15T15:19:49,728+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@83 -
Async load for objCache /key-83830190436902 {}
2022-02-15T15:19:59,778+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@298
- 3. Refresh cache by notification /key-83830190436902 {}
2022-02-15T15:19:59,778+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@271
- 1. Refresh path /key-83830190436902 {}
2022-02-15T15:19:59,778+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@282
- Refresh triggered for /key-83830190436902 {}
2022-02-15T15:19:59,778+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@271
- 1. Refresh path /key-83830190436902 {}
2022-02-15T15:19:59,779+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@282
- Refresh triggered for /key-83830190436902 {}
2022-02-15T15:19:59,779+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@83 - Async load for objCache /key-83830190436902
{}
2022-02-15T15:20:09,783+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@276 - 2. Refresh path complete
/key-83830190436902:MetadataCacheTest.MyClass(a=a, b=2) {}
2022-02-15T15:20:09,783+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@83 - Async load for objCache /key-83830190436902
{}
2022-02-15T15:20:19,788+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@276 - 2. Refresh path complete
/key-83830190436902:MetadataCacheTest.MyClass(a=a, b=2) {}
2022-02-15T15:20:19,788+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@271 - 1. Refresh path /key-83830190436902 {}
2022-02-15T15:20:19,789+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@282 - Refresh triggered for /key-83830190436902 {}
2022-02-15T15:20:19,789+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@83 - Async load for objCache /key-83830190436902
{}
2022-02-15T15:20:29,794+0800 INFO [pool-3-thread-1]
o.a.p.m.c.i.MetadataCacheImpl@276 - 2. Refresh path complete
/key-83830190436902:MetadataCacheTest.MyClass(a=a, b=2) {}
2022-02-15T15:20:29,794+0800 INFO [main] o.a.p.m.MetadataCacheTest@467 -
Get new value from cache /key-83830190436902:MetadataCacheTest.MyClass(a=a,
b=2) {}
2022-02-15T15:20:29,795+0800 INFO [main] o.a.p.m.c.i.MetadataCacheImpl@83 -
Async load for objCache /key-83870530038528 {}
2022-02-15T15:20:29,797+0800 INFO [main] o.a.p.m.i.RocksdbMetadataStore@359
- close.instanceId=1 {}
```
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +268,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.executeOrdered(path, () -> {
Review comment:
Oh, my mistake. It does not blocked at the refresh() method, because the
test uses `.join()` while calling `readModifyUpdate`
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
##########
@@ -261,12 +261,17 @@ public void invalidate(String path) {
}
@Override
- public void refresh(String path) {
+ public CompletableFuture<Void> refresh(String path) {
// Refresh object of path if only it is cached before.
- if (objCache.getIfPresent(path) != null) {
- objCache.synchronous().invalidate(path);
- objCache.synchronous().refresh(path);
- }
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ this.store.getMetadataCacheScheduler().executeOrdered(path, () -> {
+ if (objCache.getIfPresent(path) != null) {
+ objCache.synchronous().invalidate(path);
+ objCache.synchronous().refresh(path);
+ }
+ promise.complete(null);
+ });
+ return promise;
Review comment:
Good point! I have run the test with this approach, it works. Thanks,
@ben-manes for the great approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]