This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new ecf24932dae [improve][admin] Improve partitioned-topic condition
evaluation (#19015)
ecf24932dae is described below
commit ecf24932dae86685e8ff2c8ee730e7f625c6dfa8
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Jan 30 13:45:21 2023 +0800
[improve][admin] Improve partitioned-topic condition evaluation (#19015)
Co-authored-by: gavingaozhangmin <[email protected]>
(cherry picked from commit 3bab0997ffc71973c46abf229f61b6d24b6354e6)
---
.../broker/admin/impl/PersistentTopicsBase.java | 294 +++++++++++----------
1 file changed, 156 insertions(+), 138 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aed4c29e0ba..50dad31c71c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2544,80 +2544,87 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse,
String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported operation on
partitioned-topic {} {}",
+ clientAppId(), topicName, subName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Reset-cursor at position is not allowed
for partitioned-topic"));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
} else {
ret = CompletableFuture.completedFuture(null);
}
- ret.thenAccept(__ -> {
+
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ future.thenAccept(__ -> {
log.info("[{}][{}] received reset cursor on subscription {} to
position {}", clientAppId(), topicName,
subName, messageId);
- // If the topic name is a partition name, no need to get partition
topic metadata again
- if (!topicName.isPartitioned()
- && getPartitionedTopicMetadata(topicName, authoritative,
false).partitions > 0) {
- log.warn("[{}] Not supported operation on partitioned-topic {}
{}", clientAppId(), topicName,
- subName);
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
- "Reset-cursor at position is not allowed for
partitioned-topic"));
- return;
- } else {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(ignore ->
- validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
- .thenCompose(ignore ->
getTopicReferenceAsync(topicName))
- .thenAccept(topic -> {
- if (topic == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- PersistentSubscription sub =
((PersistentTopic) topic).getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getSubNotFoundErrorMessage(topicName.toString(), subName)));
- return;
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(ignore ->
+ validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+ .thenAccept(topic -> {
+ if (topic == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+
getTopicNotFoundErrorMessage(topicName.toString())));
+ return;
+ }
+ PersistentSubscription sub = ((PersistentTopic)
topic).getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+
getSubNotFoundErrorMessage(topicName.toString(), subName)));
+ return;
+ }
+ CompletableFuture<Integer> batchSizeFuture = new
CompletableFuture<>();
+ getEntryBatchSize(batchSizeFuture, (PersistentTopic)
topic, messageId, batchIndex);
+ batchSizeFuture.thenAccept(bi -> {
+ PositionImpl seekPosition =
calculatePositionAckSet(isExcluded, bi, batchIndex,
+ messageId);
+ sub.resetCursor(seekPosition).thenRun(() -> {
+ log.info("[{}][{}] successfully reset cursor
on subscription {}"
+ + " to position {}",
clientAppId(),
+ topicName, subName, messageId);
+
asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable t = (ex instanceof
CompletionException ? ex.getCause() : ex);
+ log.warn("[{}][{}] Failed to reset cursor on
subscription {}"
+ + " to position {}",
clientAppId(),
+ topicName, subName, messageId, t);
+ if (t instanceof
SubscriptionInvalidCursorPosition) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for
position specified: "
+ + t.getMessage()));
+ } else if (t instanceof
SubscriptionBusyException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " +
t.getMessage()));
+ } else {
+
resumeAsyncResponseExceptionally(asyncResponse, t);
}
- CompletableFuture<Integer> batchSizeFuture =
new CompletableFuture<>();
- getEntryBatchSize(batchSizeFuture,
(PersistentTopic) topic, messageId, batchIndex);
- batchSizeFuture.thenAccept(bi -> {
- PositionImpl seekPosition =
calculatePositionAckSet(isExcluded, bi, batchIndex,
- messageId);
- sub.resetCursor(seekPosition).thenRun(()
-> {
- log.info("[{}][{}] successfully reset
cursor on subscription {}"
- + " to position {}",
clientAppId(),
- topicName, subName, messageId);
-
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof
CompletionException ? ex.getCause() : ex);
- log.warn("[{}][{}] Failed to reset
cursor on subscription {}"
- + " to position {}",
clientAppId(),
- topicName, subName,
messageId, t);
- if (t instanceof
SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unable to find position
for position specified: "
- + t.getMessage()));
- } else if (t instanceof
SubscriptionBusyException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription
Busy: " + t.getMessage()));
- } else {
-
resumeAsyncResponseExceptionally(asyncResponse, t);
- }
- return null;
- });
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we
need to log it.
- if (!isRedirectException(ex)) {
- log.warn("[{}][{}] Failed to reset cursor on
subscription {} to position {}",
- clientAppId(), topicName, subName,
messageId, ex.getCause());
- }
- resumeAsyncResponseExceptionally(asyncResponse,
ex.getCause());
+ return null;
+ });
+ }).exceptionally(e -> {
+ asyncResponse.resume(e);
return null;
});
- }
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need
to log it.
+ if (!isRedirectException(ex)) {
+ log.warn("[{}][{}] Failed to reset cursor on
subscription {} to position {}",
+ clientAppId(), topicName, subName,
messageId, ex.getCause());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse,
ex.getCause());
+ return null;
+ });
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
@@ -3146,68 +3153,65 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetBacklogSizeByMessageId(AsyncResponse
asyncResponse,
MessageIdImpl messageId,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenAccept(partitionMetadata -> {
- if (!topicName.isPartitioned() &&
partitionMetadata.partitions > 0) {
+ CompletableFuture<Void> ret;
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported calculate backlog
size operation on partitioned-topic {}",
clientAppId(), topicName);
asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
"calculate backlog size is not allowed for
partitioned-topic"));
- } else {
- validateTopicOwnershipAsync(topicName,
authoritative)
- .thenCompose(unused ->
validateTopicOperationAsync(topicName,
- TopicOperation.GET_BACKLOG_SIZE))
- .thenCompose(unused ->
getTopicReferenceAsync(topicName))
- .thenAccept(t -> {
- PersistentTopic topic =
(PersistentTopic) t;
- PositionImpl pos = new
PositionImpl(messageId.getLedgerId(),
- messageId.getEntryId());
- if (topic == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- ManagedLedgerImpl managedLedger =
- (ManagedLedgerImpl)
topic.getManagedLedger();
- if (messageId.getLedgerId() == -1) {
-
asyncResponse.resume(managedLedger.getTotalSize());
- } else {
-
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
- }
- }).exceptionally(ex -> {
- // If the exception is not redirect
exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to get
backlog size for topic {}", clientAppId(),
- topicName, ex);
- }
-
resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
}
- }).exceptionally(ex -> {
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
+ ret = CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
+ } else {
+ future = ret;
+ }
+ future.thenAccept(__ -> validateTopicOwnershipAsync(topicName,
authoritative)
+ .thenCompose(unused -> validateTopicOperationAsync(topicName,
+ TopicOperation.GET_BACKLOG_SIZE))
+ .thenCompose(unused -> getTopicReferenceAsync(topicName))
+ .thenAccept(t -> {
+ PersistentTopic topic = (PersistentTopic) t;
+ PositionImpl pos = new
PositionImpl(messageId.getLedgerId(),
+ messageId.getEntryId());
+ if (topic == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+
getTopicNotFoundErrorMessage(topicName.toString())));
+ return;
+ }
+ ManagedLedgerImpl managedLedger =
+ (ManagedLedgerImpl) topic.getManagedLedger();
+ if (messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to
log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get backlog size for topic
{}", clientAppId(),
+ topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ })).exceptionally(ex -> {
// If the exception is not redirect exception we need
to log it.
if (!isRedirectException(ex)) {
- log.error("[{}] Failed to get backlog size for
topic {}", clientAppId(), topicName, ex);
+ log.error("[{}] Failed to validate global
namespace ownership "
+ + "to get backlog size for topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
- });
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to validate global namespace ownership
to get backlog size for topic "
- + "{}", clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
+ });
}
protected CompletableFuture<Void>
internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
@@ -3667,6 +3671,14 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalTerminatePartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
+ if (topicName.isPartitioned()) {
+ String msg = "Termination of a non-partitioned topic is not
allowed using partitioned-terminate"
+ + ", please use terminate commands";
+ log.error("[{}] [{}] {}", clientAppId(), topicName, msg);
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
msg));
+ return;
+ }
+
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -3896,11 +3908,33 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalExpireMessagesByPosition(AsyncResponse
asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId,
boolean isExcluded, int batchIndex) {
+ if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
+ String msg = "Invalid parameter for expire message by position,
partition index of "
+ + "passed in message position doesn't match partition
index for the topic";
+ log.warn("[{}] {} {}({}).", clientAppId(), msg, topicName,
messageId);
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
msg));
+ return;
+ }
+ CompletableFuture<Void> ret;
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ String msg = "Expire message at position is not
supported for partitioned-topic";
+ log.warn("[{}] {} {}({}) {}", clientAppId(), msg,
topicName, messageId, subName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, msg));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
+ ret = CompletableFuture.completedFuture(null);
+ }
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ future = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
} else {
- future = CompletableFuture.completedFuture(null);
+ future = ret;
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
@@ -3908,24 +3942,8 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on
subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
- return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
- .thenAccept(partitionMetadata -> {
- if (!topicName.isPartitioned() &&
partitionMetadata.partitions > 0) {
- String msg = "Expire message at position
is not supported for partitioned-topic";
- log.warn("[{}] {} {}({}) {}",
clientAppId(), msg, topicName, messageId, subName);
- asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED, msg));
- return;
- } else if (messageId.getPartitionIndex() !=
topicName.getPartitionIndex()) {
- String msg = "Invalid parameter for expire
message by position, partition index of "
- + "passed in message position
doesn't match partition index for the topic";
- log.warn("[{}] {} {}({}).", clientAppId(),
msg, topicName, messageId);
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED, msg));
- return;
- } else {
-
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
- messageId, isExcluded, batchIndex);
- }
- });
+ return
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
+ messageId, isExcluded, batchIndex);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
if (!isRedirectException(ex)) {