lhotari commented on code in PR #19015:
URL: https://github.com/apache/pulsar/pull/19015#discussion_r1091886551


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2608,80 +2608,87 @@ protected void 
internalGetSubscriptionProperties(AsyncResponse asyncResponse, St
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            log.warn("[{}] Not supported operation on 
partitioned-topic {} {}",
+                                    clientAppId(), topicName, subName);
+                            asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Reset-cursor at position is not allowed 
for partitioned-topic"));
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
             ret = CompletableFuture.completedFuture(null);
         }
-        ret.thenAccept(__ -> {
+
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = ret.thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName));
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+        future.thenAccept(__ -> {
             log.info("[{}][{}] received reset cursor on subscription {} to 
position {}", clientAppId(), topicName,
                     subName, messageId);
-            // If the topic name is a partition name, no need to get partition 
topic metadata again
-            if (!topicName.isPartitioned()
-                    && getPartitionedTopicMetadata(topicName, authoritative, 
false).partitions > 0) {

Review Comment:
   This fixes an issue where Zookeeper session dies because the EventThread is 
blocked
   Stack trace
   ```
   "main-EventThread" #17 daemon prio=5 os_prio=0 cpu=1788.73ms 
elapsed=5176.10s tid=0x00007fa97a581000 nid=0x54 waiting on condition  
[0x00007fa8ffbfb000]
      java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00000007dbaff838> (a 
java.util.concurrent.CompletableFuture$Signaller)
        at 
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
        at 
java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
        at 
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
        at 
java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
        at 
java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
        at 
org.apache.pulsar.broker.admin.AdminResource.fetchPartitionedTopicMetadata(AdminResource.java:492)
        at 
org.apache.pulsar.broker.admin.AdminResource.getPartitionedTopicMetadata(AdminResource.java:480)
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalResetCursorOnPosition$212(PersistentTopicsBase.java:2595)
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$1475/0x0000000840b46840.accept(Unknown
 Source)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
        at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at 
org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$21(PulsarWebResource.java:859)
        at 
org.apache.pulsar.broker.web.PulsarWebResource$$Lambda$1135/0x000000084096fc40.accept(Unknown
 Source)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
        at 
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at 
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.handleGetResult(ZKMetadataStore.java:244)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:188)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$177/0x00000008402da040.processResult(Unknown
 Source)
        at 
org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:490)
        at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:712)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
      Locked ownable synchronizers:
        - None
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to