This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 56a7b89be5f [fix][broker] Reject create non existent persistent
partitions. (#19086)
56a7b89be5f is described below
commit 56a7b89be5fecd41fc200379c96b15e3c0ace7c3
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Jan 9 19:34:19 2023 +0800
[fix][broker] Reject create non existent persistent partitions. (#19086)
---
.../broker/admin/impl/PersistentTopicsBase.java | 266 ++++++++++-----------
.../pulsar/broker/service/BrokerService.java | 19 +-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 12 +-
.../nonpersistent/NonPersistentTopicTest.java | 23 ++
.../service/persistent/PersistentTopicTest.java | 20 ++
5 files changed, 193 insertions(+), 147 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3fb551967b9..81c9638632e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -411,67 +411,85 @@ public class PersistentTopicsBase extends AdminResource {
* recreate them at application so, newly created producers and consumers
can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until
all producers are restarted at application.
*
- * @param numPartitions
+ * @param expectPartitions
* @param updateLocalTopicOnly
* @param authoritative
* @param force
*/
- protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
numPartitions,
+ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
expectPartitions,
boolean updateLocalTopicOnly,
boolean authoritative, boolean force) {
- if (numPartitions <= 0) {
- return FutureUtil.failedFuture(new
RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be more than 0"));
+ if (expectPartitions <= 0) {
+ return FutureUtil.failedFuture(
+ new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0"));
}
return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION,
- PolicyOperation.WRITE))
+ .thenCompose(__ ->
+ validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION, PolicyOperation.WRITE))
.thenCompose(__ -> {
if (!updateLocalTopicOnly && !force) {
- return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+ return
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(topicMetadata -> {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (maxPartitions > 0 && numPartitions > maxPartitions) {
+ if (maxPartitions > 0 && expectPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal
to " + maxPartitions);
}
- // Only do the validation if it's the first hop.
- if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
- return
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
- .thenApply(clusters -> {
- if
(!clusters.contains(pulsar().getConfig().getClusterName())) {
- log.error("[{}] local cluster is not part
of replicated cluster for namespace {}",
- clientAppId(), topicName);
- throw new RestException(Status.FORBIDDEN,
"Local cluster is not part of replicate"
- + " cluster list");
- }
- return clusters;
- })
- .thenCompose(clusters ->
-
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
- .thenApply(ignore -> clusters))
- .thenCompose(clusters ->
createSubscriptions(topicName, numPartitions, force).thenApply(
- ignore -> clusters))
- .thenCompose(clusters -> {
- if (!updateLocalTopicOnly) {
- return
updatePartitionInOtherCluster(numPartitions, clusters)
- .thenCompose(v ->
namespaceResources().getPartitionedTopicResources()
-
.updatePartitionedTopicAsync(topicName, p ->
- new
PartitionedTopicMetadata(numPartitions,
-
p.properties)
- ));
- } else {
- return
CompletableFuture.completedFuture(null);
- }
- });
- } else {
- return
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
- .thenCompose(ignore ->
updatePartitionedTopic(topicName, numPartitions, force));
+ final PulsarAdmin adminClient;
+ try {
+ adminClient = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ return
adminClient.topics().getListAsync(topicName.getNamespace())
+ .thenCompose(topics -> {
+ long existPartitions = topics.stream()
+ .filter(t ->
TopicName.get(t).getPartitionedTopicName()
+
.equals(topicName.getPartitionedTopicName()))
+ .count();
+ if (existPartitions >= expectPartitions) {
+ throw new RestException(Status.CONFLICT,
+ "Number of new partitions must be
greater than existing number of partitions");
+ }
+ // Only do the validation if it's the first hop.
+ if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
+ return
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
+ .thenApply(clusters -> {
+ if
(!clusters.contains(pulsar().getConfig().getClusterName())) {
+ log.error("[{}] local cluster
is not part of"
+ + " replicated
cluster for namespace {}",
+ clientAppId(),
topicName);
+ throw new
RestException(Status.FORBIDDEN,
+ "Local cluster is not
part of replicate cluster list");
+ }
+ return clusters;
+ })
+ .thenCompose(clusters ->
+
tryCreatePartitionsAsync(expectPartitions)
+ .thenApply(ignore ->
clusters))
+ .thenCompose(clusters -> {
+ if (!updateLocalTopicOnly) {
+ return
namespaceResources().getPartitionedTopicResources()
+
.updatePartitionedTopicAsync(topicName, p ->
+ new
PartitionedTopicMetadata(expectPartitions,
+
p.properties))
+ .thenCompose(__ ->
+
updatePartitionInOtherCluster(expectPartitions,
+
clusters));
+ } else {
+ return
CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(clusters ->
createSubscriptions(topicName,
+ expectPartitions));
+ } else {
+ return
tryCreatePartitionsAsync(expectPartitions)
+ .thenCompose(ignore ->
updatePartitionedTopic(topicName, expectPartitions));
+ }
+ });
});
}
@@ -4363,124 +4381,100 @@ public class PersistentTopicsBase extends
AdminResource {
}
}
- private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int numPartitions, boolean force) {
- CompletableFuture<Void> result = new CompletableFuture<>();
- createSubscriptions(topicName, numPartitions, force).thenCompose(__ ->
{
- CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p ->
- new PartitionedTopicMetadata(numPartitions,
p.properties));
- future.exceptionally(ex -> {
- // If the update operation fails, clean up the partitions that
were created
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- int oldPartition = metadata.partitions;
- for (int i = oldPartition; i < numPartitions; i++) {
-
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
- log.warn("[{}] Failed to clean up managedLedger
{}", clientAppId(), topicName,
- ex1.getCause());
- return null;
- });
- }
- }).exceptionally(e -> {
- log.warn("[{}] Failed to clean up managedLedger",
topicName, e);
- return null;
- });
+ private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int expectPartitions) {
+ CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
+ .updatePartitionedTopicAsync(topicName, p ->
+ new PartitionedTopicMetadata(expectPartitions,
p.properties));
+ future.exceptionally(ex -> {
+ // If the update operation fails, clean up the partitions that
were created
+ getPartitionedTopicMetadataAsync(topicName, false, false)
+ .thenAccept(metadata -> {
+ int oldPartition = metadata.partitions;
+ for (int i = oldPartition; i < expectPartitions; i++) {
+
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
+ log.warn("[{}] Failed to clean up managedLedger {}",
clientAppId(), topicName,
+ ex1.getCause());
+ return null;
+ });
+ }
+ }).exceptionally(e -> {
+ log.warn("[{}] Failed to clean up managedLedger", topicName,
e);
return null;
});
- return future;
- }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
- result.completeExceptionally(ex);
return null;
});
- return result;
+ return future.thenCompose(__ -> createSubscriptions(topicName,
expectPartitions));
}
/**
* It creates subscriptions for new partitions of existing
partitioned-topics.
*
* @param topicName : topic-name: persistent://prop/cluster/ns/topic
- * @param numPartitions : number partitions for the topics
- * @param ignoreConflictException : If true, ignore ConflictException:
subscription already exists for topic
+ * @param expectPartitions : number of expected partitions
*
*/
- private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int numPartitions,
- boolean
ignoreConflictException) {
+ private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int expectPartitions) {
CompletableFuture<Void> result = new CompletableFuture<>();
-
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
-> {
- if (partitionMetadata.partitions < 1) {
- result.completeExceptionally(new
RestException(Status.CONFLICT, "Topic is not partitioned topic"));
- return;
- }
-
- if (partitionMetadata.partitions >= numPartitions) {
- result.completeExceptionally(new RestException(Status.CONFLICT,
- "number of partitions must be more than existing " +
partitionMetadata.partitions));
- return;
- }
-
- PulsarAdmin admin;
- try {
- admin = pulsar().getAdminClient();
- } catch (PulsarServerException e1) {
- result.completeExceptionally(e1);
- return;
- }
+ if (expectPartitions < 1) {
+ return FutureUtil.failedFuture(new RestException(Status.CONFLICT,
"Topic is not partitioned topic"));
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e1) {
+ return FutureUtil.failedFuture(e1);
+ }
-
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
-> {
- List<CompletableFuture<Void>> subscriptionFutures = new
ArrayList<>();
+
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
-> {
+ List<CompletableFuture<Void>> subscriptionFutures = new
ArrayList<>();
- stats.getSubscriptions().entrySet().forEach(e -> {
- String subscription = e.getKey();
- SubscriptionStats ss = e.getValue();
- if (!ss.isDurable()) {
- // We must not re-create non-durable subscriptions on
the new partitions
- return;
- }
- boolean replicated = ss.isReplicated();
-
- for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
- final String topicNamePartition =
topicName.getPartition(i).toString();
- CompletableFuture<Void> future = new
CompletableFuture<>();
-
admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.latest,
replicated).whenComplete((__, ex) -> {
- if (ex == null) {
+ stats.getSubscriptions().entrySet().forEach(e -> {
+ String subscription = e.getKey();
+ SubscriptionStats ss = e.getValue();
+ if (!ss.isDurable()) {
+ // We must not re-create non-durable subscriptions on the
new partitions
+ return;
+ }
+ boolean replicated = ss.isReplicated();
+
+ for (int i = 0; i < expectPartitions; i++) {
+ final String topicNamePartition =
topicName.getPartition(i).toString();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ admin.topics().createSubscriptionAsync(topicNamePartition,
+ subscription, MessageId.latest,
replicated).whenComplete((__, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof
PulsarAdminException.ConflictException) {
future.complete(null);
} else {
- if (ignoreConflictException
- && ex instanceof
PulsarAdminException.ConflictException) {
- future.complete(null);
- } else {
- future.completeExceptionally(ex);
- }
+ future.completeExceptionally(realCause);
}
- });
- subscriptionFutures.add(future);
- }
- });
+ }
+ });
+ subscriptionFutures.add(future);
+ }
+ });
- FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
- log.info("[{}] Successfully created subscriptions on new
partitions {}", clientAppId(), topicName);
- result.complete(null);
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to create subscriptions on new
partitions for {}",
- clientAppId(), topicName, ex);
- result.completeExceptionally(ex);
- return null;
- });
+ FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
+ log.info("[{}] Successfully created subscriptions on new
partitions {}", clientAppId(), topicName);
+ result.complete(null);
}).exceptionally(ex -> {
- if (ex.getCause() instanceof
PulsarAdminException.NotFoundException) {
- // The first partition doesn't exist, so there are
currently to subscriptions to recreate
- result.complete(null);
- } else {
- log.warn("[{}] Failed to get list of subscriptions of {}",
- clientAppId(), topicName.getPartition(0), ex);
- result.completeExceptionally(ex);
- }
+ log.warn("[{}] Failed to create subscriptions on new
partitions for {}",
+ clientAppId(), topicName, ex);
+ result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
- log.warn("[{}] Failed to get partition metadata for {}",
- clientAppId(), topicName.toString());
- result.completeExceptionally(ex);
+ if (ex.getCause() instanceof
PulsarAdminException.NotFoundException) {
+ // The first partition doesn't exist, so there are currently
to subscriptions to recreate
+ result.complete(null);
+ } else {
+ log.warn("[{}] Failed to get list of subscriptions of {}",
+ clientAppId(), topicName.getPartition(0), ex);
+ result.completeExceptionally(ex);
+ }
return null;
});
return result;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ee0ad6e103b..65a5795d59e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1017,11 +1017,22 @@ public class BrokerService implements Closeable {
}
final boolean isPersistentTopic =
topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
- return topics.computeIfAbsent(topicName.toString(), (k) -> {
- return this.loadOrCreatePersistentTopic(k,
createIfMissing, properties);
+ return topics.computeIfAbsent(topicName.toString(), (tpName)
-> {
+ if (topicName.isPartitioned()) {
+ return
fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
+ .thenCompose((metadata) -> {
+ // Allow crate non-partitioned persistent
topic that name includes `partition`
+ if (metadata.partitions == 0
+ || topicName.getPartitionIndex() <
metadata.partitions) {
+ return
loadOrCreatePersistentTopic(tpName, createIfMissing, properties);
+ }
+ return
CompletableFuture.completedFuture(Optional.empty());
+ });
+ }
+ return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties);
});
} else {
- return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ return topics.computeIfAbsent(topicName.toString(), (name) -> {
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
@@ -1035,7 +1046,7 @@ public class BrokerService implements Closeable {
} else {
return
CompletableFuture.completedFuture(Optional.empty());
}
- });
+ });
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic",
topicName, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 11c84d990f6..e35e9311b9f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2676,15 +2676,13 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
startPartitions);
// create a subscription for few new partition which can fail
- admin.topics().createSubscription(partitionedTopicName + "-partition-"
+ startPartitions, subName1,
- MessageId.earliest);
-
try {
- admin.topics().updatePartitionedTopic(partitionedTopicName,
newPartitions, false, false);
- } catch (PulsarAdminException.PreconditionFailedException e) {
- // Ok
+ admin.topics().createSubscription(partitionedTopicName +
"-partition-" + startPartitions, subName1,
+ MessageId.earliest);
+ fail("Unexpected behaviour");
+ } catch (PulsarAdminException.PreconditionFailedException ex) {
+ // OK
}
-
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
startPartitions);
admin.topics().updatePartitionedTopic(partitionedTopicName,
newPartitions, false, true);
// validate subscription is created for new partition.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 71caa1edb52..73a1084f30f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,13 +18,18 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -32,6 +37,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
@Test(groups = "broker")
public class NonPersistentTopicTest extends BrokerTestBase {
@@ -96,4 +102,21 @@ public class NonPersistentTopicTest extends BrokerTestBase {
assertEquals(statsAfterUnsubscribe.getBytesOutCounter(),
statsBeforeUnsubscribe.getBytesOutCounter());
assertEquals(statsAfterUnsubscribe.getMsgOutCounter(),
statsBeforeUnsubscribe.getMsgOutCounter());
}
+
+ @Test
+ public void testCreateNonExistentPartitions() throws PulsarAdminException,
PulsarClientException {
+ final String topicName =
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
+ admin.topics().createPartitionedTopic(topicName, 4);
+ TopicName partition = TopicName.get(topicName).getPartition(4);
+ try {
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(partition.toString())
+ .create();
+ fail("unexpected behaviour");
+ } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+
+ }
+
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index aa05624a5b0..19c5bd5c9aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -46,6 +47,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
@@ -402,4 +404,22 @@ public class PersistentTopicTest extends BrokerTestBase {
assertTrue(persistentSubscription.getCursor().getLastActive() >
beforeRemoveConsumerTimestamp);
assertTrue(persistentSubscription2.getCursor().getLastActive() >
beforeRemoveConsumerTimestamp);
}
+
+
+ @Test
+ public void testCreateNonExistentPartitions() throws PulsarAdminException,
PulsarClientException {
+ final String topicName =
"persistent://prop/ns-abc/testCreateNonExistentPartitions";
+ admin.topics().createPartitionedTopic(topicName, 4);
+ TopicName partition = TopicName.get(topicName).getPartition(4);
+ try {
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(partition.toString())
+ .create();
+ fail("unexpected behaviour");
+ } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+
+ }
+
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
+ }
}