This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new ecf24932dae [improve][admin] Improve partitioned-topic condition 
evaluation (#19015)
ecf24932dae is described below

commit ecf24932dae86685e8ff2c8ee730e7f625c6dfa8
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Jan 30 13:45:21 2023 +0800

    [improve][admin] Improve partitioned-topic condition evaluation (#19015)
    
    Co-authored-by: gavingaozhangmin <[email protected]>
    (cherry picked from commit 3bab0997ffc71973c46abf229f61b6d24b6354e6)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 294 +++++++++++----------
 1 file changed, 156 insertions(+), 138 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aed4c29e0ba..50dad31c71c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2544,80 +2544,87 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            log.warn("[{}] Not supported operation on 
partitioned-topic {} {}",
+                                    clientAppId(), topicName, subName);
+                            asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Reset-cursor at position is not allowed 
for partitioned-topic"));
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
             ret = CompletableFuture.completedFuture(null);
         }
-        ret.thenAccept(__ -> {
+
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenAccept(__ -> {
             log.info("[{}][{}] received reset cursor on subscription {} to 
position {}", clientAppId(), topicName,
                     subName, messageId);
-            // If the topic name is a partition name, no need to get partition 
topic metadata again
-            if (!topicName.isPartitioned()
-                    && getPartitionedTopicMetadata(topicName, authoritative, 
false).partitions > 0) {
-                log.warn("[{}] Not supported operation on partitioned-topic {} 
{}", clientAppId(), topicName,
-                        subName);
-                asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
-                        "Reset-cursor at position is not allowed for 
partitioned-topic"));
-                return;
-            } else {
-                validateTopicOwnershipAsync(topicName, authoritative)
-                        .thenCompose(ignore ->
-                                validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
-                        .thenCompose(ignore -> 
getTopicReferenceAsync(topicName))
-                        .thenAccept(topic -> {
-                                if (topic == null) {
-                                    asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                            
getTopicNotFoundErrorMessage(topicName.toString())));
-                                    return;
-                                }
-                                PersistentSubscription sub = 
((PersistentTopic) topic).getSubscription(subName);
-                                if (sub == null) {
-                                    asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                            
getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                                    return;
+            validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(ignore ->
+                            validateTopicOperationAsync(topicName, 
TopicOperation.RESET_CURSOR, subName))
+                    .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+                    .thenAccept(topic -> {
+                        if (topic == null) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                    
getTopicNotFoundErrorMessage(topicName.toString())));
+                            return;
+                        }
+                        PersistentSubscription sub = ((PersistentTopic) 
topic).getSubscription(subName);
+                        if (sub == null) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                    
getSubNotFoundErrorMessage(topicName.toString(), subName)));
+                            return;
+                        }
+                        CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
+                        getEntryBatchSize(batchSizeFuture, (PersistentTopic) 
topic, messageId, batchIndex);
+                        batchSizeFuture.thenAccept(bi -> {
+                            PositionImpl seekPosition = 
calculatePositionAckSet(isExcluded, bi, batchIndex,
+                                    messageId);
+                            sub.resetCursor(seekPosition).thenRun(() -> {
+                                log.info("[{}][{}] successfully reset cursor 
on subscription {}"
+                                                + " to position {}", 
clientAppId(),
+                                        topicName, subName, messageId);
+                                
asyncResponse.resume(Response.noContent().build());
+                            }).exceptionally(ex -> {
+                                Throwable t = (ex instanceof 
CompletionException ? ex.getCause() : ex);
+                                log.warn("[{}][{}] Failed to reset cursor on 
subscription {}"
+                                                + " to position {}", 
clientAppId(),
+                                        topicName, subName, messageId, t);
+                                if (t instanceof 
SubscriptionInvalidCursorPosition) {
+                                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                            "Unable to find position for 
position specified: "
+                                                    + t.getMessage()));
+                                } else if (t instanceof 
SubscriptionBusyException) {
+                                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                            "Failed for Subscription Busy: " + 
t.getMessage()));
+                                } else {
+                                    
resumeAsyncResponseExceptionally(asyncResponse, t);
                                 }
-                                CompletableFuture<Integer> batchSizeFuture = 
new CompletableFuture<>();
-                                getEntryBatchSize(batchSizeFuture, 
(PersistentTopic) topic, messageId, batchIndex);
-                                batchSizeFuture.thenAccept(bi -> {
-                                    PositionImpl seekPosition = 
calculatePositionAckSet(isExcluded, bi, batchIndex,
-                                            messageId);
-                                    sub.resetCursor(seekPosition).thenRun(() 
-> {
-                                        log.info("[{}][{}] successfully reset 
cursor on subscription {}"
-                                                        + " to position {}", 
clientAppId(),
-                                                topicName, subName, messageId);
-                                        
asyncResponse.resume(Response.noContent().build());
-                                    }).exceptionally(ex -> {
-                                        Throwable t = (ex instanceof 
CompletionException ? ex.getCause() : ex);
-                                        log.warn("[{}][{}] Failed to reset 
cursor on subscription {}"
-                                                        + " to position {}", 
clientAppId(),
-                                                        topicName, subName, 
messageId, t);
-                                        if (t instanceof 
SubscriptionInvalidCursorPosition) {
-                                            asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                                                    "Unable to find position 
for position specified: "
-                                                            + t.getMessage()));
-                                        } else if (t instanceof 
SubscriptionBusyException) {
-                                            asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                                                    "Failed for Subscription 
Busy: " + t.getMessage()));
-                                        } else {
-                                            
resumeAsyncResponseExceptionally(asyncResponse, t);
-                                        }
-                                        return null;
-                                    });
-                                }).exceptionally(e -> {
-                                    asyncResponse.resume(e);
-                                    return null;
-                                });
-                        }).exceptionally(ex -> {
-                            // If the exception is not redirect exception we 
need to log it.
-                            if (!isRedirectException(ex)) {
-                                log.warn("[{}][{}] Failed to reset cursor on 
subscription {} to position {}",
-                                        clientAppId(), topicName, subName, 
messageId, ex.getCause());
-                            }
-                            resumeAsyncResponseExceptionally(asyncResponse, 
ex.getCause());
+                                return null;
+                            });
+                        }).exceptionally(e -> {
+                            asyncResponse.resume(e);
                             return null;
                         });
-                }
+                    }).exceptionally(ex -> {
+                        // If the exception is not redirect exception we need 
to log it.
+                        if (!isRedirectException(ex)) {
+                            log.warn("[{}][{}] Failed to reset cursor on 
subscription {} to position {}",
+                                    clientAppId(), topicName, subName, 
messageId, ex.getCause());
+                        }
+                        resumeAsyncResponseExceptionally(asyncResponse, 
ex.getCause());
+                        return null;
+                    });
         }).exceptionally(ex -> {
             // If the exception is not redirect exception we need to log it.
             if (!isRedirectException(ex)) {
@@ -3146,68 +3153,65 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse,
                                                      MessageIdImpl messageId, 
boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenAccept(__ -> {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenAccept(partitionMetadata -> {
-                        if (!topicName.isPartitioned() && 
partitionMetadata.partitions > 0) {
+        CompletableFuture<Void> ret;
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
                             log.warn("[{}] Not supported calculate backlog 
size operation on partitioned-topic {}",
                                     clientAppId(), topicName);
                             asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
                                     "calculate backlog size is not allowed for 
partitioned-topic"));
-                        } else {
-                            validateTopicOwnershipAsync(topicName, 
authoritative)
-                                    .thenCompose(unused -> 
validateTopicOperationAsync(topicName,
-                                            TopicOperation.GET_BACKLOG_SIZE))
-                                    .thenCompose(unused -> 
getTopicReferenceAsync(topicName))
-                                    .thenAccept(t -> {
-                                        PersistentTopic topic = 
(PersistentTopic) t;
-                                        PositionImpl pos = new 
PositionImpl(messageId.getLedgerId(),
-                                                messageId.getEntryId());
-                                        if (topic == null) {
-                                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                                    
getTopicNotFoundErrorMessage(topicName.toString())));
-                                            return;
-                                        }
-                                        ManagedLedgerImpl managedLedger =
-                                                (ManagedLedgerImpl) 
topic.getManagedLedger();
-                                        if (messageId.getLedgerId() == -1) {
-                                            
asyncResponse.resume(managedLedger.getTotalSize());
-                                        } else {
-                                            
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
-                                        }
-                                    }).exceptionally(ex -> {
-                                        // If the exception is not redirect 
exception we need to log it.
-                                        if (!isRedirectException(ex)) {
-                                            log.error("[{}] Failed to get 
backlog size for topic {}", clientAppId(),
-                                                    topicName, ex);
-                                        }
-                                        
resumeAsyncResponseExceptionally(asyncResponse, ex);
-                                        return null;
-                                    });
                         }
-                    }).exceptionally(ex -> {
+                        return CompletableFuture.completedFuture(null);
+                    });
+        } else {
+            ret = CompletableFuture.completedFuture(null);
+        }
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
+        } else {
+            future = ret;
+        }
+        future.thenAccept(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)
+                .thenCompose(unused -> validateTopicOperationAsync(topicName,
+                        TopicOperation.GET_BACKLOG_SIZE))
+                .thenCompose(unused -> getTopicReferenceAsync(topicName))
+                .thenAccept(t -> {
+                    PersistentTopic topic = (PersistentTopic) t;
+                    PositionImpl pos = new 
PositionImpl(messageId.getLedgerId(),
+                            messageId.getEntryId());
+                    if (topic == null) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                
getTopicNotFoundErrorMessage(topicName.toString())));
+                        return;
+                    }
+                    ManagedLedgerImpl managedLedger =
+                            (ManagedLedgerImpl) topic.getManagedLedger();
+                    if (messageId.getLedgerId() == -1) {
+                        asyncResponse.resume(managedLedger.getTotalSize());
+                    } else {
+                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }).exceptionally(ex -> {
+                    // If the exception is not redirect exception we need to 
log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get backlog size for topic 
{}", clientAppId(),
+                                topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                })).exceptionally(ex -> {
                         // If the exception is not redirect exception we need 
to log it.
                         if (!isRedirectException(ex)) {
-                            log.error("[{}] Failed to get backlog size for 
topic {}", clientAppId(), topicName, ex);
+                            log.error("[{}] Failed to validate global 
namespace ownership "
+                                    + "to get backlog size for topic {}", 
clientAppId(), topicName, ex);
                         }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
-            });
-        }).exceptionally(ex -> {
-            // If the exception is not redirect exception we need to log it.
-            if (!isRedirectException(ex)) {
-                log.error("[{}] Failed to validate global namespace ownership 
to get backlog size for topic "
-                        + "{}", clientAppId(), topicName, ex);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
-            return null;
-        });
+                });
     }
 
     protected CompletableFuture<Void> 
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
@@ -3667,6 +3671,14 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalTerminatePartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
+        if (topicName.isPartitioned()) {
+            String msg = "Termination of a non-partitioned topic is not 
allowed using partitioned-terminate"
+                    + ", please use terminate commands";
+            log.error("[{}] [{}] {}", clientAppId(), topicName, msg);
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, 
msg));
+            return;
+        }
+
         CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
             future = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -3896,11 +3908,33 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalExpireMessagesByPosition(AsyncResponse 
asyncResponse, String subName, boolean authoritative,
                                                  MessageIdImpl messageId, 
boolean isExcluded, int batchIndex) {
+        if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
+            String msg = "Invalid parameter for expire message by position, 
partition index of "
+                    + "passed in message position doesn't match partition 
index for the topic";
+            log.warn("[{}] {} {}({}).", clientAppId(), msg, topicName, 
messageId);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, 
msg));
+            return;
+        }
+        CompletableFuture<Void> ret;
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            String msg = "Expire message at position is not 
supported for partitioned-topic";
+                            log.warn("[{}] {} {}({}) {}", clientAppId(), msg, 
topicName, messageId, subName);
+                            asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, msg));
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
+        } else {
+            ret = CompletableFuture.completedFuture(null);
+        }
         CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+            future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
         } else {
-            future = CompletableFuture.completedFuture(null);
+            future = ret;
         }
 
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
@@ -3908,24 +3942,8 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> {
                     log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);
-                    return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
-                            .thenAccept(partitionMetadata -> {
-                                if (!topicName.isPartitioned() && 
partitionMetadata.partitions > 0) {
-                                    String msg = "Expire message at position 
is not supported for partitioned-topic";
-                                    log.warn("[{}] {} {}({}) {}", 
clientAppId(), msg, topicName, messageId, subName);
-                                    asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED, msg));
-                                    return;
-                                } else if (messageId.getPartitionIndex() != 
topicName.getPartitionIndex()) {
-                                    String msg = "Invalid parameter for expire 
message by position, partition index of "
-                                            + "passed in message position 
doesn't match partition index for the topic";
-                                    log.warn("[{}] {} {}({}).", clientAppId(), 
msg, topicName, messageId);
-                                    asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED, msg));
-                                    return;
-                                } else {
-                                    
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
-                                            messageId, isExcluded, batchIndex);
-                                }
-                            });
+                    return 
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
+                            messageId, isExcluded, batchIndex);
                 }).exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isRedirectException(ex)) {

Reply via email to