lhotari commented on code in PR #24427: URL: https://github.com/apache/pulsar/pull/24427#discussion_r2298074650
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java: ########## @@ -178,92 +190,257 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName, log.info("Skip delete topic-level policies because {} has been removed before", changeEvents); return CompletableFuture.completedFuture(null); } - return sendTopicPolicyEvent(topicName, ActionType.DELETE, null, - keepGlobalPolicies); + // delete local policy + return updateTopicPoliciesAsync(topicName, null, false, ActionType.DELETE, true) + .thenCompose(__ -> + // delete global policy + updateTopicPoliciesAsync(topicName, null, true, ActionType.DELETE, true)); }); } @Override - public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, + boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer<TopicPolicies> policyUpdater) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException( "Not allowed to update topic policy for the heartbeat topic")); } - return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, false); + return updateTopicPoliciesAsync(topicName, policyUpdater, isGlobalPolicy, ActionType.UPDATE, + skipUpdateWhenTopicPolicyDoesntExist); } - private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType, - @Nullable TopicPolicies policies, boolean keepGlobalPoliciesAfterDeleting) { - return pulsarService.getPulsarResources().getNamespaceResources() - .getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(namespacePolicies -> { - if (namespacePolicies.isPresent() && namespacePolicies.get().deleted) { - log.debug("[{}] skip sending topic policy event since the namespace is deleted", topicName); - return CompletableFuture.completedFuture(null); - } - - try { - createSystemTopicFactoryIfNeeded(); - } catch (PulsarServerException e) { - return CompletableFuture.failedFuture(e); - } - CompletableFuture<Void> result = new CompletableFuture<>(); - writerCaches.get(topicName.getNamespaceObject()) - .whenComplete((writer, cause) -> { - if (cause != null) { - writerCaches.synchronous().invalidate(topicName.getNamespaceObject()); - result.completeExceptionally(cause); - } else { - CompletableFuture<MessageId> writeFuture = - sendTopicPolicyEventInternal(topicName, actionType, writer, policies, - keepGlobalPoliciesAfterDeleting); - writeFuture.whenComplete((messageId, e) -> { - if (e != null) { - result.completeExceptionally(e); - } else { - if (messageId != null) { - result.complete(null); + private CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, + Consumer<TopicPolicies> policyUpdater, + boolean isGlobalPolicy, + ActionType actionType, + boolean skipUpdateWhenTopicPolicyDoesntExist) { + if (closed.get()) { + return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed.")); + } + TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + Pair<TopicName, Boolean> sequencerKey = Pair.of(partitionedTopicName, isGlobalPolicy); + + CompletableFuture<Void> operationFuture = new CompletableFuture<>(); + + // Chain the operation on the sequencer for the specific topic and policy type + topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> { + CompletableFuture<Void> chain = (existingFuture == null || existingFuture.isDone()) + ? CompletableFuture.completedFuture(null) + : existingFuture; + + return chain.thenCompose(v -> + pulsarService.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(namespacePolicies -> { + if (namespacePolicies.isPresent() && namespacePolicies.get().deleted) { + log.debug("[{}] skip sending topic policy event since the namespace is deleted", + topicName); + return CompletableFuture.completedFuture(null); + } + return getTopicPoliciesAsync(partitionedTopicName, + isGlobalPolicy ? GetType.GLOBAL_ONLY : GetType.LOCAL_ONLY) + .thenCompose(currentPolicies -> { + if (currentPolicies.isEmpty() && skipUpdateWhenTopicPolicyDoesntExist) { + log.debug("[{}] No existing policies, skipping sending event as " + + "requested", topicName); + return CompletableFuture.completedFuture(null); + } + TopicPolicies policiesToUpdate; + if (actionType == ActionType.DELETE) { + policiesToUpdate = null; // For delete, policies object is null } else { - result.completeExceptionally( - new RuntimeException("Got message id is null.")); + policiesToUpdate = currentPolicies.isEmpty() + ? createTopicPolicies(isGlobalPolicy) + : currentPolicies.get().clone(); + policyUpdater.accept(policiesToUpdate); } - } - }); - } - }); - return result; - }); + return sendTopicPolicyEventInternal(topicName, actionType, policiesToUpdate, + isGlobalPolicy); + }) + .thenCompose(messageId -> { + if (messageId == null) { + return CompletableFuture.completedFuture(null); + } else { + // asynchronously wait until the message ID is read by the reader + return untilMessageIdHasBeenRead(topicName.getNamespaceObject(), + messageId); + } + }); + })); + }).whenComplete((res, ex) -> { + // remove the current future from the sequencer map, if it is done + // this would remove the future from the sequencer map when the last operation completes in the chained + // future + topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> { + if (existingFuture != null && existingFuture.isDone()) { + // Remove the completed future from the sequencer map + return null; + } + return existingFuture; + }); + if (ex != null) { + writerCaches.synchronous().invalidate(topicName.getNamespaceObject()); + operationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); + } else { + operationFuture.complete(res); + } + }); + return operationFuture; + } + + /** + * Asynchronously waits until the message ID has been read by the reader. + * This ensures that the write operation has been fully processed and the changes are effective. + * @param namespaceObject the namespace object for which the message ID is being tracked + * @param messageId the message ID to wait for being handled + * @return a CompletableFuture that completes when the message ID has been read by the reader + */ + private CompletableFuture<Void> untilMessageIdHasBeenRead(NamespaceName namespaceObject, MessageId messageId) { + CompletableFuture<Void> future = new CompletableFuture<>(); + getMessageHandlerTracker(namespaceObject).addPendingFuture((MessageIdAdv) messageId, future); + return future; + } + + private TopicPolicyMessageHandlerTracker getMessageHandlerTracker(NamespaceName namespaceObject) { + return topicPolicyMessageHandlerTrackers.computeIfAbsent(namespaceObject, + ns -> new TopicPolicyMessageHandlerTracker()); + } + + private record PendingMessageFuture(MessageId messageId, CompletableFuture<Void> future) + implements Comparable<PendingMessageFuture> { + @Override + public int compareTo(PendingMessageFuture o) { + return messageId.compareTo(o.messageId); + } + } + + /** + * This tracks the last handled message IDs for each partition of the topic policies topic and + * pending futures for topic policy messages. Each namespace has its own tracker instance since + * this is tracking the per-namespace __change_events topic. + * The purpose for this tracker is to ensure that write operations on topic policies don't complete before the topic + * policies message has been read by the reader and effective. + */ + private static class TopicPolicyMessageHandlerTracker implements AutoCloseable { + private final List<MessageIdAdv> lastHandledMessageIds = new ArrayList<>(); + private final List<PriorityQueue<PendingMessageFuture>> pendingFutures = new ArrayList<>(); + private boolean closed = false; + + /** + * Called after a message ID has been handled by the reader. + * This will update the last handled message ID for the partition and complete any pending futures that are + * registered to the handled message ID or before it. + * @param messageId the message ID that has been handled + */ + public synchronized void handleMessageId(MessageIdAdv messageId) { + if (closed) { + return; + } + int partitionIndex = messageId.getPartitionIndex(); + if (partitionIndex < 0) { + partitionIndex = 0; + } + while (lastHandledMessageIds.size() <= partitionIndex) { + lastHandledMessageIds.add(null); + } + lastHandledMessageIds.set(partitionIndex, messageId); + if (pendingFutures.size() > partitionIndex) { + PriorityQueue<PendingMessageFuture> pq = pendingFutures.get(partitionIndex); + while (!pq.isEmpty() && pq.peek().messageId.compareTo(messageId) <= 0) { + PendingMessageFuture pendingFuture = pq.poll(); + completeFuture(pendingFuture.messageId(), pendingFuture.future()); + } + } + } + + /** + * Adds a pending future for a message ID. If the message ID is already handled, the future will be completed + * immediately. + * @param messageId the message ID to add the future for + * @param future the future to complete when the message ID is handled + */ + public synchronized void addPendingFuture(MessageIdAdv messageId, CompletableFuture<Void> future) { + if (closed) { + completeFuture(messageId, future); + return; + } + int partitionIndex = messageId.getPartitionIndex(); + if (partitionIndex < 0) { + partitionIndex = 0; + } + while (pendingFutures.size() <= partitionIndex) { + pendingFutures.add(new PriorityQueue<>()); + } + MessageIdAdv lastHandledMessageId = + lastHandledMessageIds.size() > partitionIndex ? lastHandledMessageIds.get(partitionIndex) : null; + if (lastHandledMessageId != null && lastHandledMessageId.compareTo(messageId) >= 0) { + // If the messageId is already handled, complete the future immediately + completeFuture(messageId, future); + return; + } + pendingFutures.get(partitionIndex).add(new PendingMessageFuture(messageId, future)); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + for (PriorityQueue<PendingMessageFuture> pq : pendingFutures) { + while (!pq.isEmpty()) { + PendingMessageFuture pendingFuture = pq.poll(); + completeFuture(pendingFuture.messageId(), pendingFuture.future()); + } + } + pendingFutures.clear(); + lastHandledMessageIds.clear(); + } + } + + private void completeFuture(MessageId messageId, CompletableFuture<Void> future) { + try { + future.complete(null); + } catch (Exception ex) { + log.error("Failed to complete pending future for message id {}.", messageId, ex); + } + } + } + + + private static TopicPolicies createTopicPolicies(boolean isGlobalPolicy) { + TopicPolicies topicPolicies = new TopicPolicies(); + topicPolicies.setIsGlobal(isGlobalPolicy); + return topicPolicies; } private CompletableFuture<MessageId> sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType, - SystemTopicClient.Writer<PulsarEvent> writer, @Nullable TopicPolicies policies, - boolean keepGlobalPoliciesAfterDeleting) { - PulsarEvent event = getPulsarEvent(topicName, actionType, policies); - if (!ActionType.DELETE.equals(actionType)) { - return writer.writeAsync(getEventKey(event, policies != null && policies.isGlobalPolicies()), event); - } - // When a topic is deleting, delete both non-global and global topic-level policies. - CompletableFuture<MessageId> dealWithGlobalPolicy; - if (keepGlobalPoliciesAfterDeleting) { - dealWithGlobalPolicy = CompletableFuture.completedFuture(null); - } else { - dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event, true), event); - } - CompletableFuture<MessageId> deletePolicies = dealWithGlobalPolicy - .thenCompose(__ -> { - return writer.deleteAsync(getEventKey(event, false), event); - }); - deletePolicies.exceptionally(ex -> { - log.error("Failed to delete topic policy [{}] error.", topicName, ex); - return null; - }); - return deletePolicies; + @Nullable TopicPolicies policies, + boolean isGlobalPolicy) { + return writerCaches.get(topicName.getNamespaceObject()) Review Comment: Resolved this and added a comment about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org