poorbarcode commented on code in PR #20303:
URL: https://github.com/apache/pulsar/pull/20303#discussion_r1192424212


##########
pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java:
##########
@@ -425,6 +428,75 @@ public void testDeleteUnusedDirectories(String provider, 
Supplier<String> urlSup
         assertFalse(store.exists(prefix).join());
     }
 
+    @DataProvider(name = "conditionOfSwitchThread")
+    public Object[][] conditionOfSwitchThread(){
+        return new Object[][]{
+            {false, false},
+            {false, true},
+            {true, false},
+            {true, true}
+        };
+    }
+
+    @Test(dataProvider = "conditionOfSwitchThread")
+    public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+        final String prefix = newKey();
+        final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+        MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+                
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+        builder.fsyncEnable(false);
+        builder.batchingEnabled(enabledBatch);
+        if (!hasSynchronizer) {
+            builder.synchronizer(null);
+        }
+        MetadataStoreConfig config = builder.build();
+        @Cleanup
+        ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+        final Runnable verify = () -> {
+            String currentThreadName = Thread.currentThread().getName();
+            String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+                    metadataStoreName, currentThreadName);
+            if 
(!Thread.currentThread().getName().startsWith(metadataStoreName)){
+                throw new RuntimeException(errorMessage);
+            }
+        };
+
+        // put with node which has parent(but the parent node is not exists).
+        store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // put.
+        store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // get.
+        store.get(prefix + "/b1").thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // get the node which is not exists.
+        store.get(prefix + "/non").thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // delete.
+        store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // delete the node which is not exists.
+        store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).exceptionally(ex -> {
+            verify.run();
+            return null;
+        }).join();
+    }

Review Comment:
   > Could you add a nested request case? So like:
   
   No, this will cause a deadlock. The metadata store thread pool is a 
singleton thread pool.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to