lhotari commented on code in PR #19015:
URL: https://github.com/apache/pulsar/pull/19015#discussion_r1091886551
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2608,80 +2608,87 @@ protected void
internalGetSubscriptionProperties(AsyncResponse asyncResponse, St
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse,
String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported operation on
partitioned-topic {} {}",
+ clientAppId(), topicName, subName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Reset-cursor at position is not allowed
for partitioned-topic"));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
} else {
ret = CompletableFuture.completedFuture(null);
}
- ret.thenAccept(__ -> {
+
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenAccept(__ -> {
log.info("[{}][{}] received reset cursor on subscription {} to
position {}", clientAppId(), topicName,
subName, messageId);
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned()
- && getPartitionedTopicMetadata(topicName, authoritative,
false).partitions > 0) {
Review Comment:
This fixes an issue where Zookeeper session dies because the EventThread is
blocked
Stack trace
```
"main-EventThread" #17 daemon prio=5 os_prio=0 cpu=1788.73ms
elapsed=5176.10s tid=0x00007fa97a581000 nid=0x54 waiting on condition
[0x00007fa8ffbfb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000007dbaff838> (a
java.util.concurrent.CompletableFuture$Signaller)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at
java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
at
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
at
java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
at
java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
at
org.apache.pulsar.broker.admin.AdminResource.fetchPartitionedTopicMetadata(AdminResource.java:492)
at
org.apache.pulsar.broker.admin.AdminResource.getPartitionedTopicMetadata(AdminResource.java:480)
at
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalResetCursorOnPosition$212(PersistentTopicsBase.java:2595)
at
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$1475/0x0000000840b46840.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
at
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
at
org.apache.pulsar.broker.web.PulsarWebResource.lambda$checkLocalOrGetPeerReplicationCluster$21(PulsarWebResource.java:859)
at
org.apache.pulsar.broker.web.PulsarWebResource$$Lambda$1135/0x000000084096fc40.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
at
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.handleGetResult(ZKMetadataStore.java:244)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:188)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$177/0x00000008402da040.processResult(Unknown
Source)
at
org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:490)
at
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:712)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
Locked ownable synchronizers:
- None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]