This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d7c7d0db52 [improve][broker] Refactor update topic partitions
endpoint. (#19166)
4d7c7d0db52 is described below
commit 4d7c7d0db5204f4ad52fd5858394cd9455f3837d
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jan 18 16:34:36 2023 +0800
[improve][broker] Refactor update topic partitions endpoint. (#19166)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 -
.../broker/admin/impl/PersistentTopicsBase.java | 348 +++++++++------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 38 +--
.../pulsar/broker/admin/v2/PersistentTopics.java | 35 +--
.../pulsar/broker/admin/PersistentTopicsTest.java | 2 +-
5 files changed, 186 insertions(+), 252 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1632831b634..f00717377ef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -165,21 +165,6 @@ public abstract class AdminResource extends
PulsarWebResource {
return FutureUtil.waitForAll(futures);
}
- protected CompletableFuture<Void> tryCreateExtendedPartitionsAsync(int
oldNumPartitions, int numPartitions) {
- if (!topicName.isPersistent()) {
- return CompletableFuture.completedFuture(null);
- }
- if (numPartitions <= oldNumPartitions) {
- return CompletableFuture.failedFuture(new
RestException(Status.NOT_ACCEPTABLE,
- "Number of new partitions must be greater than existing
number of partitions"));
- }
- List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions
- oldNumPartitions);
- for (int i = oldNumPartitions; i < numPartitions; i++) {
- futures.add(tryCreatePartitionAsync(i));
- }
- return FutureUtil.waitForAll(futures);
- }
-
private CompletableFuture<Void> tryCreatePartitionAsync(final int
partition) {
CompletableFuture<Void> result = new CompletableFuture<>();
getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition))
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 50724142b0c..73f25914d7a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -23,6 +23,7 @@ import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInte
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import com.github.zafarkhaja.semver.Version;
+import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
@@ -129,7 +131,6 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -408,89 +409,160 @@ public class PersistentTopicsBase extends AdminResource {
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
- * Already created partitioned producers and consumers can't see newly
created partitions and it requires to
- * recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
- * well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
- *
- * @param expectPartitions
- * @param updateLocalTopicOnly
- * @param authoritative
- * @param force
+ * @exception RestException Unprocessable entity, status code: 422. throw
it when some pre-check failed.
+ * @exception RestException Internal Server Error, status code: 500. throw
it when get unknown Exception
*/
- protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
expectPartitions,
-
boolean updateLocalTopicOnly,
-
boolean authoritative, boolean force) {
- if (expectPartitions <= 0) {
- return FutureUtil.failedFuture(
- new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0"));
- }
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ ->
- validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION, PolicyOperation.WRITE))
- .thenCompose(__ -> {
- if (!updateLocalTopicOnly && !force) {
- return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
- } else {
- return CompletableFuture.completedFuture(null);
+ protected @Nonnull CompletableFuture<Void>
internalUpdatePartitionedTopicAsync(int expectPartitions,
+
boolean updateLocal,
+
boolean force) {
+ PulsarService pulsarService = pulsar();
+ return
pulsarService.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(partitionedTopicMetadata -> {
+ int currentMetadataPartitions =
partitionedTopicMetadata.partitions;
+ if (currentMetadataPartitions <= 0) {
+ throw new RestException(422 /* Unprocessable entity*/,
+ String.format("Topic %s is not the partitioned
topic.", topicName));
}
- }).thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
- .thenCompose(topicMetadata -> {
- final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (maxPartitions > 0 && expectPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal
to " + maxPartitions);
+ if (expectPartitions < currentMetadataPartitions) {
+ throw new RestException(422 /* Unprocessable entity*/,
+ String.format("Expect partitions %s can't less
than current partitions %s.",
+ expectPartitions,
currentMetadataPartitions));
}
- final PulsarAdmin adminClient;
+ int brokerMaximumPartitionsPerTopic =
pulsarService.getConfiguration()
+ .getMaxNumPartitionsPerPartitionedTopic();
+ if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions >
brokerMaximumPartitionsPerTopic) {
+ throw new RestException(422 /* Unprocessable entity*/,
+ String.format("Expect partitions %s grater than
maximum partitions per topic %s",
+ expectPartitions,
brokerMaximumPartitionsPerTopic));
+ }
+ final PulsarAdmin admin;
try {
- adminClient = pulsar().getAdminClient();
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
+ admin = pulsarService.getAdminClient();
+ } catch (PulsarServerException ex) {
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
Throwables.getRootCause(ex));
}
- return
adminClient.topics().getListAsync(topicName.getNamespace())
- .thenCompose(topics -> {
- long existPartitions = topics.stream()
- .filter(t ->
TopicName.get(t).getPartitionedTopicName()
-
.equals(topicName.getPartitionedTopicName()))
- .count();
- if (existPartitions >= expectPartitions) {
- throw new RestException(Status.CONFLICT,
- "Number of new partitions must be
greater than existing number of partitions");
- }
- // Only do the validation if it's the first hop.
- if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
- return
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
- .thenApply(clusters -> {
- if
(!clusters.contains(pulsar().getConfig().getClusterName())) {
- log.error("[{}] local cluster
is not part of"
- + " replicated
cluster for namespace {}",
- clientAppId(),
topicName);
- throw new
RestException(Status.FORBIDDEN,
- "Local cluster is not
part of replicate cluster list");
- }
- return clusters;
- })
- .thenCompose(clusters ->
-
tryCreatePartitionsAsync(expectPartitions)
- .thenApply(ignore ->
clusters))
- .thenCompose(clusters -> {
- if (!updateLocalTopicOnly) {
- return
namespaceResources().getPartitionedTopicResources()
-
.updatePartitionedTopicAsync(topicName, p ->
- new
PartitionedTopicMetadata(expectPartitions,
-
p.properties))
- .thenCompose(__ ->
-
updatePartitionInOtherCluster(expectPartitions,
-
clusters));
- } else {
- return
CompletableFuture.completedFuture(null);
- }
- }).thenCompose(clusters ->
createSubscriptions(topicName,
- expectPartitions));
- } else {
- return
tryCreatePartitionsAsync(expectPartitions)
- .thenCompose(ignore ->
updatePartitionedTopic(topicName, expectPartitions));
- }
- });
+ final CompletableFuture<Void> checkFuture;
+ if (!force) {
+ checkFuture =
admin.topics().getListAsync(topicName.getNamespace(), topicName.getDomain())
+ .thenAccept(topics -> {
+ final List<TopicName> existingPartitions =
topics.stream()
+ .map(TopicName::get)
+ .filter(candidateTopicName ->
candidateTopicName.getPartitionedTopicName()
+
.equals(topicName.getPartitionedTopicName()))
+ .collect(Collectors.toList());
+ // Check whether exist unexpected partition
+ Optional<Integer> maximumPartitionIndex =
existingPartitions.stream()
+ .map(TopicName::getPartitionIndex)
+ .max(Integer::compareTo);
+ if (maximumPartitionIndex.isPresent()
+ && maximumPartitionIndex.get() >=
currentMetadataPartitions) {
+ List<String> unexpectedPartitions =
existingPartitions.stream()
+ .filter(candidateTopicName ->
+ candidateTopicName
+
.getPartitionIndex() > currentMetadataPartitions)
+ .map(TopicName::toString)
+ .collect(Collectors.toList());
+ throw new
RestException(Status.CONFLICT,
+ String.format(
+ "Exist unexpected
topic partition(partition index grater than"
+ + " current
metadata maximum index %s) %s ",
+
currentMetadataPartitions,
+
StringUtils.join(unexpectedPartitions, ",")));
+ }
+ });
+ } else {
+ checkFuture = CompletableFuture.completedFuture(null);
+ }
+ return checkFuture.thenCompose(topics -> {
+ final CompletableFuture<Void> updateMetadataFuture =
(expectPartitions == currentMetadataPartitions)
+ // current metadata partitions is equals to expect
partitions
+ ? CompletableFuture.completedFuture(null)
+ // update current cluster topic metadata
+ :
namespaceResources().getPartitionedTopicResources()
+ .updatePartitionedTopicAsync(topicName, m ->
+ new
PartitionedTopicMetadata(expectPartitions, m.properties));
+ return updateMetadataFuture
+ // create missing partitions
+ .thenCompose(__ ->
tryCreatePartitionsAsync(expectPartitions))
+ // because we should consider the compatibility.
+ // Copy subscriptions from partition 0 instead of being
created by the customer
+ .thenCompose(__ ->
+
admin.topics().getStatsAsync(topicName.getPartition(0).toString())
+ .thenCompose(stats -> {
+ List<CompletableFuture<Void>> futures
= stats.getSubscriptions().entrySet()
+ // We must not re-create
non-durable subscriptions on the new partitions
+ .stream().filter(entry ->
entry.getValue().isDurable())
+ .map(entry -> {
+ final
List<CompletableFuture<Void>> innerFutures =
+ new
ArrayList<>(expectPartitions);
+ for (int i = 0; i <
expectPartitions; i++) {
+
innerFutures.add(admin.topics().createSubscriptionAsync(
+
topicName.getPartition(i).toString(),
+
entry.getKey(), MessageId.earliest,
+
entry.getValue().isReplicated(),
+
entry.getValue().getSubscriptionProperties())
+
.exceptionally(ex -> {
+ Throwable
rc =
+
FutureUtil.unwrapCompletionException(ex);
+ if (!(rc
instanceof PulsarAdminException
+
.ConflictException)) {
+
log.warn("[{}] got an error while copying"
+
+ " the subscription to the"
+
+ " partition {}.", topicName,
+
Throwables.getRootCause(rc));
+ throw
FutureUtil.wrapToCompletionException(rc);
+ }
+ // Ignore
subscription already exist exception
+ return
null;
+ }));
+ }
+ return
FutureUtil.waitForAll(innerFutures);
+
}).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures);
+ })
+ );
+ }).thenCompose(__ -> {
+ if (updateLocal || !topicName.isGlobal()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // update remote cluster
+ return namespaceResources().getPoliciesAsync(namespaceName)
+ .thenCompose(policies -> {
+ if (!policies.isPresent()) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ final Set<String> replicationClusters =
policies.get().replication_clusters;
+ if (replicationClusters.size() == 0) {
+ return
CompletableFuture.completedFuture(null);
+ }
+ boolean containsCurrentCluster =
+
replicationClusters.contains(pulsar().getConfig().getClusterName());
+ if (!containsCurrentCluster) {
+ log.error("[{}] local cluster is not part
of replicated cluster for namespace {}",
+ clientAppId(), topicName);
+ throw new RestException(422,
+ "Local cluster is not part of
replicate cluster list");
+ }
+ if (replicationClusters.size() == 1) {
+ // The replication clusters just has the
current cluster itself.
+ return
CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> futures =
replicationClusters.stream()
+ .map(replicationCluster ->
admin.clusters().getClusterAsync(replicationCluster)
+ .thenCompose(clusterData ->
pulsarService.getBrokerService()
+
.getClusterPulsarAdmin(replicationCluster, Optional.of(clusterData))
+
.topics().updatePartitionedTopicAsync(topicName.toString(),
+ expectPartitions,
true, force)
+ .exceptionally(ex -> {
+ log.warn("[{}][{}]
Update remote cluster partition fail.",
+
topicName, replicationCluster, ex);
+ throw
FutureUtil.wrapToCompletionException(ex);
+ })
+ )
+ ).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures);
+ });
+ });
});
}
@@ -529,24 +601,6 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- private CompletableFuture<Void> updatePartitionInOtherCluster(int
numPartitions, Set<String> clusters) {
- List<CompletableFuture<Void>> results = new
ArrayList<>(clusters.size() - 1);
- clusters.forEach(cluster -> {
- if (cluster.equals(pulsar().getConfig().getClusterName())) {
- return;
- }
- CompletableFuture<Void> updatePartitionTopicFuture =
-
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
- .thenApply(clusterDataOp ->
-
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
- .thenCompose(pulsarAdmin ->
- pulsarAdmin.topics().updatePartitionedTopicAsync(
- topicName.toString(), numPartitions, true,
false));
- results.add(updatePartitionTopicFuture);
- });
- return FutureUtil.waitForAll(results);
- }
-
protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean
authoritative,
boolean
checkAllowAutoCreation) {
return sync(() -> internalGetPartitionedMetadataAsync(authoritative,
checkAllowAutoCreation));
@@ -4389,106 +4443,6 @@ public class PersistentTopicsBase extends AdminResource
{
}
}
- private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int expectPartitions) {
- CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p ->
- new PartitionedTopicMetadata(expectPartitions,
p.properties));
- future.exceptionally(ex -> {
- // If the update operation fails, clean up the partitions that
were created
- getPartitionedTopicMetadataAsync(topicName, false, false)
- .thenAccept(metadata -> {
- int oldPartition = metadata.partitions;
- for (int i = oldPartition; i < expectPartitions; i++) {
-
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
- log.warn("[{}] Failed to clean up managedLedger {}",
clientAppId(), topicName,
- ex1.getCause());
- return null;
- });
- }
- }).exceptionally(e -> {
- log.warn("[{}] Failed to clean up managedLedger", topicName,
e);
- return null;
- });
- return null;
- });
- return future.thenCompose(__ -> createSubscriptions(topicName,
expectPartitions));
- }
-
- /**
- * It creates subscriptions for new partitions of existing
partitioned-topics.
- *
- * @param topicName : topic-name: persistent://prop/cluster/ns/topic
- * @param expectPartitions : number of expected partitions
- *
- */
- private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int expectPartitions) {
- CompletableFuture<Void> result = new CompletableFuture<>();
- if (expectPartitions < 1) {
- return FutureUtil.failedFuture(new RestException(Status.CONFLICT,
"Topic is not partitioned topic"));
- }
- PulsarAdmin admin;
- try {
- admin = pulsar().getAdminClient();
- } catch (PulsarServerException e1) {
- return FutureUtil.failedFuture(e1);
- }
-
-
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
-> {
- List<CompletableFuture<Void>> subscriptionFutures = new
ArrayList<>();
-
- stats.getSubscriptions().entrySet().forEach(e -> {
- String subscription = e.getKey();
- SubscriptionStats ss = e.getValue();
- if (!ss.isDurable()) {
- // We must not re-create non-durable subscriptions on the
new partitions
- return;
- }
- boolean replicated = ss.isReplicated();
-
- for (int i = 0; i < expectPartitions; i++) {
- final String topicNamePartition =
topicName.getPartition(i).toString();
- CompletableFuture<Void> future = new CompletableFuture<>();
- admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.earliest,
replicated, ss.getSubscriptionProperties())
- .whenComplete((__, ex) -> {
- if (ex == null) {
- future.complete(null);
- } else {
- Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
- if (realCause instanceof
PulsarAdminException.ConflictException) {
- future.complete(null);
- } else {
- future.completeExceptionally(realCause);
- }
- }
- });
- subscriptionFutures.add(future);
- }
- });
-
- FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
- log.info("[{}] Successfully created subscriptions on new
partitions {}", clientAppId(), topicName);
- result.complete(null);
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to create subscriptions on new
partitions for {}",
- clientAppId(), topicName, ex);
- result.completeExceptionally(ex);
- return null;
- });
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof
PulsarAdminException.NotFoundException) {
- // The first partition doesn't exist, so there are currently
to subscriptions to recreate
- result.complete(null);
- } else {
- log.warn("[{}] Failed to get list of subscriptions of {}",
- clientAppId(), topicName.getPartition(0), ex);
- result.completeExceptionally(ex);
- }
- return null;
- });
- return result;
- }
-
// as described at : (PR: #836) CPP-client old client lib should not be
allowed to connect on partitioned-topic.
// So, all requests from old-cpp-client (< v1.21) must be rejected.
// Pulsar client-java lib always passes user-agent as X-Java-$version.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fe9ced198f4..16f1b357dcd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -47,6 +47,8 @@ import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
@@ -257,43 +259,43 @@ public class PersistentTopics extends
PersistentTopicsBase {
* It updates number of partitions of an existing non-global partitioned
topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
- *
- * Already created partitioned producers and consumers can't see newly
created partitions and it requires to
- * recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
- * well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
- *
- * @param property
- * @param cluster
- * @param namespace
- * @param numPartitions
*/
@POST
@Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
@ApiOperation(hidden = true, value = "Increment partitions of an existing
partitioned topic.",
notes = "It only increments partitions of existing non-global
partitioned-topic")
@ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Update topic partition
successful."),
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
- @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Namespace or topic does not
exist"),
- @ApiResponse(code = 406, message = "The number of partitions
should be more than 0"
- + " and less than or equal to
maxNumPartitionsPerPartitionedTopic"
+ @ApiResponse(code = 401, message = "Unauthenticated"),
+ @ApiResponse(code = 403, message = "Forbidden/Unauthorized"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 422, message = "The number of partitions
should be more than 0 and"
+ + " less than or equal to
maxNumPartitionsPerPartitionedTopic"
+ " and number of new partitions must be greater than
existing number of partitions"),
- @ApiResponse(code = 409, message = "Partitioned topic does not
exist")})
+ @ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
public void updatePartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
- @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopicOnly,
+ @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopic,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("force") @DefaultValue("false") boolean force,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalUpdatePartitionedTopicAsync(numPartitions,
updateLocalTopicOnly, authoritative, force)
- .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE)
+ .thenCompose(__ ->
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force))
+ .thenAccept(__ -> {
+ log.info("[{}][{}] Updated topic partition to {}.",
clientAppId(), topicName, numPartitions);
+ asyncResponse.resume(Response.noContent().build());
+ })
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
- log.error("[{}] Failed to update partitioned topic
{}", clientAppId(), topicName, ex);
+ log.error("[{}][{}] Failed to update partition to {}",
+ clientAppId(), topicName, numPartitions, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 796bd2bb15b..b25e4d17353 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -778,30 +778,20 @@ public class PersistentTopics extends
PersistentTopicsBase {
* It updates number of partitions of an existing non-global partitioned
topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
- *
- * Already created partitioned producers and consumers can't see newly
created partitions and it requires to
- * recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
- * well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
- *
- * @param tenant
- * @param namespace
- * @param encodedTopic
- * @param numPartitions
*/
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Increment partitions of an existing partitioned
topic.",
notes = "It only increments partitions of existing non-global
partitioned-topic")
@ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Update topic partition
successful."),
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
- @ApiResponse(code = 401,
- message = "Don't have permission to administrate resources
on this tenant"),
- @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Tenant does not exist"),
- @ApiResponse(code = 406, message = "The number of partitions
should be more than 0 and"
+ @ApiResponse(code = 401, message = "Unauthenticated"),
+ @ApiResponse(code = 403, message = "Forbidden/Unauthorized"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 422, message = "The number of partitions
should be more than 0 and"
+ " less than or equal to
maxNumPartitionsPerPartitionedTopic"
+ " and number of new partitions must be greater than
existing number of partitions"),
- @ApiResponse(code = 409, message = "Partitioned topic does not
exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
@@ -813,7 +803,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopicOnly,
+ @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopic,
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("force") @DefaultValue("false") boolean force,
@@ -821,13 +811,16 @@ public class PersistentTopics extends
PersistentTopicsBase {
required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
- validatePartitionedTopicMetadataAsync()
- .thenCompose(__ ->
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly,
- authoritative, force))
- .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE)
+ .thenCompose(__ ->
internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopic, force))
+ .thenAccept(__ -> {
+ log.info("[{}][{}] Updated topic partition to {}.",
clientAppId(), topicName, numPartitions);
+ asyncResponse.resume(Response.noContent().build());
+ })
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
- log.error("[{}] Failed to update partitioned topic
{}", clientAppId(), topicName, ex);
+ log.error("[{}][{}] Failed to update partition to {}",
+ clientAppId(), topicName, numPartitions, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 875bf629d4f..6b68b6b2bf6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1637,7 +1637,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
true, 3);
verify(response,
timeout(5000).times(1)).resume(throwableCaptor.capture());
Assert.assertEquals(throwableCaptor.getValue().getMessage(),
- "Number of new partitions must be greater than existing number
of partitions");
+ "Expect partitions 3 can't less than current partitions 4.");
response = mock(AsyncResponse.class);
metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);