lhotari commented on a change in pull request #13043:
URL: https://github.com/apache/pulsar/pull/13043#discussion_r759679497



##########
File path: 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -128,105 +143,229 @@ protected void receivedSessionEvent(SessionEvent event) 
{
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> storeGet(String path) {
-        CompletableFuture<Optional<GetResult>> future = new 
CompletableFuture<>();
-
+    protected void batchOperation(List<MetadataOp> ops) {
         try {
-            zkc.getData(path, null, (rc, path1, ctx, data, stat) -> {
-                execute(() -> {
+            
zkc.multi(ops.stream().map(this::convertOp).collect(Collectors.toList()), (rc, 
path, ctx, results) -> {
+                if (results == null ) {
                     Code code = Code.get(rc);
-                    if (code == Code.OK) {
-                        future.complete(Optional.of(new GetResult(data, 
getStat(path1, stat))));
-                    } else if (code == Code.NONODE) {
-                        future.complete(Optional.empty());
+                    if (code == Code.CONNECTIONLOSS) {
+                        // There is the chance that we caused a connection 
reset by sending or requesting a batch
+                        // that passed the max ZK limit. Retry with the 
individual operations
+                        executor.schedule(() -> {
+                            ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
+                        }, 100, TimeUnit.MILLISECONDS);
                     } else {
-                        future.completeExceptionally(getException(code, path));
+                        MetadataStoreException e = getException(code, path);
+                        ops.forEach(o -> 
o.getFuture().completeExceptionally(e));
                     }
-                }, future);
+                    return;
+                }
+
+                // Trigger all the futures in the batch
+                for (int i = 0; i < ops.size(); i++) {
+                    OpResult opr = results.get(i);
+                    MetadataOp op = ops.get(i);
+
+                    switch (op.getType()) {
+                        case PUT:
+                            handlePutResult(op.asPut(), opr);
+                            break;
+                        case DELETE:
+                            handleDeleteResult(op.asDelete(), opr);
+                            break;
+                        case GET:
+                            handleGetResult(op.asGet(), opr);
+                            break;
+                        case GET_CHILDREN:
+                            handleGetChildrenResult(op.asGetChildren(), opr);
+                            break;
+
+                        default:
+                            op.getFuture().completeExceptionally(new 
IllegalStateException(
+                                    "Operation type not supported in multi: " 
+ op.getType()));
+                    }
+                }
             }, null);
         } catch (Throwable t) {
-            future.completeExceptionally(new MetadataStoreException(t));
+            ops.forEach(o -> o.getFuture().completeExceptionally(t));

Review comment:
       Just wondering if t should be wrapped once in a MetadataStoreException 
to be consistent. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to