merlimat commented on a change in pull request #13043:
URL: https://github.com/apache/pulsar/pull/13043#discussion_r759683779
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -128,105 +143,229 @@ protected void receivedSessionEvent(SessionEvent event)
{
}
@Override
- public CompletableFuture<Optional<GetResult>> storeGet(String path) {
- CompletableFuture<Optional<GetResult>> future = new
CompletableFuture<>();
-
+ protected void batchOperation(List<MetadataOp> ops) {
try {
- zkc.getData(path, null, (rc, path1, ctx, data, stat) -> {
- execute(() -> {
+
zkc.multi(ops.stream().map(this::convertOp).collect(Collectors.toList()), (rc,
path, ctx, results) -> {
+ if (results == null ) {
Code code = Code.get(rc);
- if (code == Code.OK) {
- future.complete(Optional.of(new GetResult(data,
getStat(path1, stat))));
- } else if (code == Code.NONODE) {
- future.complete(Optional.empty());
+ if (code == Code.CONNECTIONLOSS) {
+ // There is the chance that we caused a connection
reset by sending or requesting a batch
+ // that passed the max ZK limit. Retry with the
individual operations
+ executor.schedule(() -> {
+ ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
+ }, 100, TimeUnit.MILLISECONDS);
} else {
- future.completeExceptionally(getException(code, path));
+ MetadataStoreException e = getException(code, path);
+ ops.forEach(o ->
o.getFuture().completeExceptionally(e));
}
- }, future);
+ return;
+ }
+
+ // Trigger all the futures in the batch
+ for (int i = 0; i < ops.size(); i++) {
+ OpResult opr = results.get(i);
+ MetadataOp op = ops.get(i);
+
+ switch (op.getType()) {
+ case PUT:
+ handlePutResult(op.asPut(), opr);
+ break;
+ case DELETE:
+ handleDeleteResult(op.asDelete(), opr);
+ break;
+ case GET:
+ handleGetResult(op.asGet(), opr);
+ break;
+ case GET_CHILDREN:
+ handleGetChildrenResult(op.asGetChildren(), opr);
+ break;
+
+ default:
+ op.getFuture().completeExceptionally(new
IllegalStateException(
+ "Operation type not supported in multi: "
+ op.getType()));
+ }
+ }
}, null);
} catch (Throwable t) {
- future.completeExceptionally(new MetadataStoreException(t));
+ ops.forEach(o -> o.getFuture().completeExceptionally(t));
Review comment:
👍 Yes, it makes sense to wrap it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]