lhotari commented on code in PR #24427:
URL: https://github.com/apache/pulsar/pull/24427#discussion_r2298074650


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -178,92 +190,257 @@ public CompletableFuture<Void> 
deleteTopicPoliciesAsync(TopicName topicName,
                 log.info("Skip delete topic-level policies because {} has been 
removed before", changeEvents);
                 return CompletableFuture.completedFuture(null);
             }
-            return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
-                    keepGlobalPolicies);
+            // delete local policy
+            return updateTopicPoliciesAsync(topicName, null, false, 
ActionType.DELETE, true)
+                    .thenCompose(__ ->
+                            // delete global policy
+                            updateTopicPoliciesAsync(topicName, null, true, 
ActionType.DELETE, true));
         });
     }
 
     @Override
-    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName, TopicPolicies policies) {
+    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName,
+                                                            boolean 
isGlobalPolicy,
+                                                            boolean 
skipUpdateWhenTopicPolicyDoesntExist,
+                                                            
Consumer<TopicPolicies> policyUpdater) {
         if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
             return CompletableFuture.failedFuture(new 
BrokerServiceException.NotAllowedException(
                     "Not allowed to update topic policy for the heartbeat 
topic"));
         }
-        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, 
false);
+        return updateTopicPoliciesAsync(topicName, policyUpdater, 
isGlobalPolicy, ActionType.UPDATE,
+                skipUpdateWhenTopicPolicyDoesntExist);
     }
 
-    private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
-         @Nullable TopicPolicies policies, boolean 
keepGlobalPoliciesAfterDeleting) {
-        return pulsarService.getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(namespacePolicies -> {
-                    if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
-                        log.debug("[{}] skip sending topic policy event since 
the namespace is deleted", topicName);
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    try {
-                        createSystemTopicFactoryIfNeeded();
-                    } catch (PulsarServerException e) {
-                        return CompletableFuture.failedFuture(e);
-                    }
-                    CompletableFuture<Void> result = new CompletableFuture<>();
-                    writerCaches.get(topicName.getNamespaceObject())
-                            .whenComplete((writer, cause) -> {
-                                if (cause != null) {
-                                    
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
-                                    result.completeExceptionally(cause);
-                                } else {
-                                    CompletableFuture<MessageId> writeFuture =
-                                            
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
-                                                    
keepGlobalPoliciesAfterDeleting);
-                                    writeFuture.whenComplete((messageId, e) -> 
{
-                                        if (e != null) {
-                                            result.completeExceptionally(e);
-                                        } else {
-                                            if (messageId != null) {
-                                                result.complete(null);
+    private CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName,
+                                                             
Consumer<TopicPolicies> policyUpdater,
+                                                             boolean 
isGlobalPolicy,
+                                                             ActionType 
actionType,
+                                                             boolean 
skipUpdateWhenTopicPolicyDoesntExist) {
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException(getClass().getName() + " is closed."));
+        }
+        TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        Pair<TopicName, Boolean> sequencerKey = Pair.of(partitionedTopicName, 
isGlobalPolicy);
+
+        CompletableFuture<Void> operationFuture = new CompletableFuture<>();
+
+        // Chain the operation on the sequencer for the specific topic and 
policy type
+        topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) 
-> {
+            CompletableFuture<Void> chain = (existingFuture == null || 
existingFuture.isDone())
+                    ? CompletableFuture.completedFuture(null)
+                    : existingFuture;
+
+            return chain.thenCompose(v ->
+                    pulsarService.getPulsarResources().getNamespaceResources()
+                            .getPoliciesAsync(topicName.getNamespaceObject())
+                            .thenCompose(namespacePolicies -> {
+                                if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
+                                    log.debug("[{}] skip sending topic policy 
event since the namespace is deleted",
+                                            topicName);
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                return 
getTopicPoliciesAsync(partitionedTopicName,
+                                        isGlobalPolicy ? GetType.GLOBAL_ONLY : 
GetType.LOCAL_ONLY)
+                                        .thenCompose(currentPolicies -> {
+                                            if (currentPolicies.isEmpty() && 
skipUpdateWhenTopicPolicyDoesntExist) {
+                                                log.debug("[{}] No existing 
policies, skipping sending event as "
+                                                        + "requested", 
topicName);
+                                                return 
CompletableFuture.completedFuture(null);
+                                            }
+                                            TopicPolicies policiesToUpdate;
+                                            if (actionType == 
ActionType.DELETE) {
+                                                policiesToUpdate = null; // 
For delete, policies object is null
                                             } else {
-                                                result.completeExceptionally(
-                                                        new 
RuntimeException("Got message id is null."));
+                                                policiesToUpdate = 
currentPolicies.isEmpty()
+                                                        ? 
createTopicPolicies(isGlobalPolicy)
+                                                        : 
currentPolicies.get().clone();
+                                                
policyUpdater.accept(policiesToUpdate);
                                             }
-                                        }
-                                    });
-                            }
-                    });
-                    return result;
-                });
+                                            return 
sendTopicPolicyEventInternal(topicName, actionType, policiesToUpdate,
+                                                    isGlobalPolicy);
+                                        })
+                                        .thenCompose(messageId -> {
+                                            if (messageId == null) {
+                                                return 
CompletableFuture.completedFuture(null);
+                                            } else {
+                                                // asynchronously wait until 
the message ID is read by the reader
+                                                return 
untilMessageIdHasBeenRead(topicName.getNamespaceObject(),
+                                                        messageId);
+                                            }
+                                        });
+                            }));
+        }).whenComplete((res, ex) -> {
+            // remove the current future from the sequencer map, if it is done
+            // this would remove the future from the sequencer map when the 
last operation completes in the chained
+            // future
+            topicPolicyUpdateSequencer.compute(sequencerKey, (key, 
existingFuture) -> {
+                if (existingFuture != null && existingFuture.isDone()) {
+                    // Remove the completed future from the sequencer map
+                    return null;
+                }
+                return existingFuture;
+            });
+            if (ex != null) {
+                
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
+                
operationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
+            } else {
+                operationFuture.complete(res);
+            }
+        });
+        return operationFuture;
+    }
+
+    /**
+     * Asynchronously waits until the message ID has been read by the reader.
+     * This ensures that the write operation has been fully processed and the 
changes are effective.
+     * @param namespaceObject the namespace object for which the message ID is 
being tracked
+     * @param messageId the message ID to wait for being handled
+     * @return a CompletableFuture that completes when the message ID has been 
read by the reader
+     */
+    private CompletableFuture<Void> untilMessageIdHasBeenRead(NamespaceName 
namespaceObject, MessageId messageId) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        
getMessageHandlerTracker(namespaceObject).addPendingFuture((MessageIdAdv) 
messageId, future);
+        return future;
+    }
+
+    private TopicPolicyMessageHandlerTracker 
getMessageHandlerTracker(NamespaceName namespaceObject) {
+        return 
topicPolicyMessageHandlerTrackers.computeIfAbsent(namespaceObject,
+                ns -> new TopicPolicyMessageHandlerTracker());
+    }
+
+    private record PendingMessageFuture(MessageId messageId, 
CompletableFuture<Void> future)
+            implements Comparable<PendingMessageFuture> {
+        @Override
+        public int compareTo(PendingMessageFuture o) {
+            return messageId.compareTo(o.messageId);
+        }
+    }
+
+    /**
+     * This tracks the last handled message IDs for each partition of the 
topic policies topic and
+     * pending futures for topic policy messages. Each namespace has its own 
tracker instance since
+     * this is tracking the per-namespace __change_events topic.
+     * The purpose for this tracker is to ensure that write operations on 
topic policies don't complete before the topic
+     * policies message has been read by the reader and effective.
+     */
+    private static class TopicPolicyMessageHandlerTracker implements 
AutoCloseable {
+        private final List<MessageIdAdv> lastHandledMessageIds = new 
ArrayList<>();
+        private final List<PriorityQueue<PendingMessageFuture>> pendingFutures 
= new ArrayList<>();
+        private boolean closed = false;
+
+        /**
+         * Called after a message ID has been handled by the reader.
+         * This will update the last handled message ID for the partition and 
complete any pending futures that are
+         * registered to the handled message ID or before it.
+         * @param messageId the message ID that has been handled
+         */
+        public synchronized void handleMessageId(MessageIdAdv messageId) {
+            if (closed) {
+                return;
+            }
+            int partitionIndex = messageId.getPartitionIndex();
+            if (partitionIndex < 0) {
+                partitionIndex = 0;
+            }
+            while (lastHandledMessageIds.size() <= partitionIndex) {
+                lastHandledMessageIds.add(null);
+            }
+            lastHandledMessageIds.set(partitionIndex, messageId);
+            if (pendingFutures.size() > partitionIndex) {
+                PriorityQueue<PendingMessageFuture> pq = 
pendingFutures.get(partitionIndex);
+                while (!pq.isEmpty() && 
pq.peek().messageId.compareTo(messageId) <= 0) {
+                    PendingMessageFuture pendingFuture = pq.poll();
+                    completeFuture(pendingFuture.messageId(), 
pendingFuture.future());
+                }
+            }
+        }
+
+        /**
+         * Adds a pending future for a message ID. If the message ID is 
already handled, the future will be completed
+         * immediately.
+         * @param messageId the message ID to add the future for
+         * @param future the future to complete when the message ID is handled
+         */
+        public synchronized void addPendingFuture(MessageIdAdv messageId, 
CompletableFuture<Void> future) {
+            if (closed) {
+                completeFuture(messageId, future);
+                return;
+            }
+            int partitionIndex = messageId.getPartitionIndex();
+            if (partitionIndex < 0) {
+                partitionIndex = 0;
+            }
+            while (pendingFutures.size() <= partitionIndex) {
+                pendingFutures.add(new PriorityQueue<>());
+            }
+            MessageIdAdv lastHandledMessageId =
+                    lastHandledMessageIds.size() > partitionIndex ? 
lastHandledMessageIds.get(partitionIndex) : null;
+            if (lastHandledMessageId != null && 
lastHandledMessageId.compareTo(messageId) >= 0) {
+                // If the messageId is already handled, complete the future 
immediately
+                completeFuture(messageId, future);
+                return;
+            }
+            pendingFutures.get(partitionIndex).add(new 
PendingMessageFuture(messageId, future));
+        }
+
+        @Override
+        public synchronized void close() {
+            if (!closed) {
+                closed = true;
+                for (PriorityQueue<PendingMessageFuture> pq : pendingFutures) {
+                    while (!pq.isEmpty()) {
+                        PendingMessageFuture pendingFuture = pq.poll();
+                        completeFuture(pendingFuture.messageId(), 
pendingFuture.future());
+                    }
+                }
+                pendingFutures.clear();
+                lastHandledMessageIds.clear();
+            }
+        }
+
+        private void completeFuture(MessageId messageId, 
CompletableFuture<Void> future) {
+            try {
+                future.complete(null);
+            } catch (Exception ex) {
+                log.error("Failed to complete pending future for message id 
{}.", messageId, ex);
+            }
+        }
+    }
+
+
+    private static TopicPolicies createTopicPolicies(boolean isGlobalPolicy) {
+        TopicPolicies topicPolicies = new TopicPolicies();
+        topicPolicies.setIsGlobal(isGlobalPolicy);
+        return topicPolicies;
     }
 
     private CompletableFuture<MessageId> 
sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType,
-          SystemTopicClient.Writer<PulsarEvent> writer, @Nullable 
TopicPolicies policies,
-          boolean keepGlobalPoliciesAfterDeleting) {
-        PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
-        if (!ActionType.DELETE.equals(actionType)) {
-            return writer.writeAsync(getEventKey(event, policies != null && 
policies.isGlobalPolicies()), event);
-        }
-        // When a topic is deleting, delete both non-global and global 
topic-level policies.
-        CompletableFuture<MessageId> dealWithGlobalPolicy;
-        if (keepGlobalPoliciesAfterDeleting) {
-            dealWithGlobalPolicy = CompletableFuture.completedFuture(null);
-        } else {
-            dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event, 
true), event);
-        }
-        CompletableFuture<MessageId> deletePolicies = dealWithGlobalPolicy
-            .thenCompose(__ -> {
-                return writer.deleteAsync(getEventKey(event, false), event);
-            });
-        deletePolicies.exceptionally(ex -> {
-            log.error("Failed to delete topic policy [{}] error.", topicName, 
ex);
-            return null;
-        });
-        return deletePolicies;
+                                                                      
@Nullable TopicPolicies policies,
+                                                                      boolean 
isGlobalPolicy) {
+        return writerCaches.get(topicName.getNamespaceObject())

Review Comment:
   Resolved this and added a comment about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to