This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 56a7b89be5f [fix][broker] Reject create non existent persistent 
partitions. (#19086)
56a7b89be5f is described below

commit 56a7b89be5fecd41fc200379c96b15e3c0ace7c3
Author: Qiang Zhao <mattisonc...@apache.org>
AuthorDate: Mon Jan 9 19:34:19 2023 +0800

    [fix][broker] Reject create non existent persistent partitions. (#19086)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 266 ++++++++++-----------
 .../pulsar/broker/service/BrokerService.java       |  19 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  12 +-
 .../nonpersistent/NonPersistentTopicTest.java      |  23 ++
 .../service/persistent/PersistentTopicTest.java    |  20 ++
 5 files changed, 193 insertions(+), 147 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3fb551967b9..81c9638632e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -411,67 +411,85 @@ public class PersistentTopicsBase extends AdminResource {
      * recreate them at application so, newly created producers and consumers 
can connect to newly added partitions as
      * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
      *
-     * @param numPartitions
+     * @param expectPartitions
      * @param updateLocalTopicOnly
      * @param authoritative
      * @param force
      */
-    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
numPartitions,
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
expectPartitions,
                                                                           
boolean updateLocalTopicOnly,
                                                                           
boolean authoritative, boolean force) {
-        if (numPartitions <= 0) {
-            return FutureUtil.failedFuture(new 
RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be more than 0"));
+        if (expectPartitions <= 0) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0"));
         }
         return validateTopicOwnershipAsync(topicName, authoritative)
-            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION,
-                    PolicyOperation.WRITE))
+            .thenCompose(__ ->
+                    validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION, PolicyOperation.WRITE))
             .thenCompose(__ -> {
                 if (!updateLocalTopicOnly && !force) {
-                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
                 }  else {
                     return CompletableFuture.completedFuture(null);
                 }
             }).thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
             .thenCompose(topicMetadata -> {
                 final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                if (maxPartitions > 0 && expectPartitions > maxPartitions) {
                     throw new RestException(Status.NOT_ACCEPTABLE,
                             "Number of partitions should be less than or equal 
to " + maxPartitions);
                 }
-                // Only do the validation if it's the first hop.
-                if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
-                    return 
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
-                            .thenApply(clusters -> {
-                                if 
(!clusters.contains(pulsar().getConfig().getClusterName())) {
-                                    log.error("[{}] local cluster is not part 
of replicated cluster for namespace {}",
-                                    clientAppId(), topicName);
-                                    throw new RestException(Status.FORBIDDEN, 
"Local cluster is not part of replicate"
-                                            + " cluster list");
-                                }
-                                return clusters;
-                            })
-                            .thenCompose(clusters ->
-                                    
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
-                                            .thenApply(ignore -> clusters))
-                            .thenCompose(clusters -> 
createSubscriptions(topicName, numPartitions, force).thenApply(
-                                    ignore -> clusters))
-                            .thenCompose(clusters -> {
-                                if (!updateLocalTopicOnly) {
-                                    return 
updatePartitionInOtherCluster(numPartitions, clusters)
-                                        .thenCompose(v -> 
namespaceResources().getPartitionedTopicResources()
-                                                        
.updatePartitionedTopicAsync(topicName, p ->
-                                                                new 
PartitionedTopicMetadata(numPartitions,
-                                                                    
p.properties)
-                                                        ));
-                                } else {
-                                    return 
CompletableFuture.completedFuture(null);
-                                }
-                            });
-                } else {
-                    return 
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
-                            .thenCompose(ignore -> 
updatePartitionedTopic(topicName, numPartitions, force));
+                final PulsarAdmin adminClient;
+                try {
+                    adminClient = pulsar().getAdminClient();
+                } catch (PulsarServerException e) {
+                    throw new RuntimeException(e);
                 }
+                return 
adminClient.topics().getListAsync(topicName.getNamespace())
+                        .thenCompose(topics -> {
+                            long existPartitions = topics.stream()
+                                    .filter(t -> 
TopicName.get(t).getPartitionedTopicName()
+                                            
.equals(topicName.getPartitionedTopicName()))
+                                    .count();
+                            if (existPartitions >= expectPartitions) {
+                                throw new RestException(Status.CONFLICT,
+                                        "Number of new partitions must be 
greater than existing number of partitions");
+                            }
+                            // Only do the validation if it's the first hop.
+                            if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
+                                return 
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
+                                        .thenApply(clusters -> {
+                                            if 
(!clusters.contains(pulsar().getConfig().getClusterName())) {
+                                                log.error("[{}] local cluster 
is not part of"
+                                                                + " replicated 
cluster for namespace {}",
+                                                        clientAppId(), 
topicName);
+                                                throw new 
RestException(Status.FORBIDDEN,
+                                                        "Local cluster is not 
part of replicate cluster list");
+                                            }
+                                            return clusters;
+                                        })
+                                        .thenCompose(clusters ->
+                                                
tryCreatePartitionsAsync(expectPartitions)
+                                                        .thenApply(ignore -> 
clusters))
+                                        .thenCompose(clusters -> {
+                                            if (!updateLocalTopicOnly) {
+                                                return 
namespaceResources().getPartitionedTopicResources()
+                                                        
.updatePartitionedTopicAsync(topicName, p ->
+                                                                new 
PartitionedTopicMetadata(expectPartitions,
+                                                                        
p.properties))
+                                                        .thenCompose(__ ->
+                                                                
updatePartitionInOtherCluster(expectPartitions,
+                                                                        
clusters));
+                                            } else {
+                                                return 
CompletableFuture.completedFuture(null);
+                                            }
+                                        }).thenCompose(clusters -> 
createSubscriptions(topicName,
+                                                expectPartitions));
+                            } else {
+                                return 
tryCreatePartitionsAsync(expectPartitions)
+                                        .thenCompose(ignore -> 
updatePartitionedTopic(topicName, expectPartitions));
+                            }
+                        });
             });
     }
 
@@ -4363,124 +4381,100 @@ public class PersistentTopicsBase extends 
AdminResource {
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
-                            return null;
-                        });
-                    }
-                }).exceptionally(e -> {
-                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
-                    return null;
-                });
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < expectPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
+                                ex1.getCause());
+                        return null;
+                    });
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
                 return null;
             });
-            return future;
-        }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
-            result.completeExceptionally(ex);
             return null;
         });
-        return result;
+        return future.thenCompose(__ -> createSubscriptions(topicName, 
expectPartitions));
     }
 
     /**
      * It creates subscriptions for new partitions of existing 
partitioned-topics.
      *
      * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
-     * @param numPartitions : number partitions for the topics
-     * @param ignoreConflictException : If true, ignore ConflictException: 
subscription already exists for topic
+     * @param expectPartitions : number of expected partitions
      *
      */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int numPartitions,
-                                                       boolean 
ignoreConflictException) {
+    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int expectPartitions) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
 -> {
-            if (partitionMetadata.partitions < 1) {
-                result.completeExceptionally(new 
RestException(Status.CONFLICT, "Topic is not partitioned topic"));
-                return;
-            }
-
-            if (partitionMetadata.partitions >= numPartitions) {
-                result.completeExceptionally(new RestException(Status.CONFLICT,
-                        "number of partitions must be more than existing " + 
partitionMetadata.partitions));
-                return;
-            }
-
-            PulsarAdmin admin;
-            try {
-                admin = pulsar().getAdminClient();
-            } catch (PulsarServerException e1) {
-                result.completeExceptionally(e1);
-                return;
-            }
+        if (expectPartitions < 1) {
+            return FutureUtil.failedFuture(new RestException(Status.CONFLICT, 
"Topic is not partitioned topic"));
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {
+            return FutureUtil.failedFuture(e1);
+        }
 
-            
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
-                List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
+        
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
+            List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
 
-                stats.getSubscriptions().entrySet().forEach(e -> {
-                    String subscription = e.getKey();
-                    SubscriptionStats ss = e.getValue();
-                    if (!ss.isDurable()) {
-                        // We must not re-create non-durable subscriptions on 
the new partitions
-                        return;
-                    }
-                    boolean replicated = ss.isReplicated();
-
-                    for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
-                        final String topicNamePartition = 
topicName.getPartition(i).toString();
-                        CompletableFuture<Void> future = new 
CompletableFuture<>();
-                        
admin.topics().createSubscriptionAsync(topicNamePartition,
-                                        subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
-                            if (ex == null) {
+            stats.getSubscriptions().entrySet().forEach(e -> {
+                String subscription = e.getKey();
+                SubscriptionStats ss = e.getValue();
+                if (!ss.isDurable()) {
+                    // We must not re-create non-durable subscriptions on the 
new partitions
+                    return;
+                }
+                boolean replicated = ss.isReplicated();
+
+                for (int i = 0; i < expectPartitions; i++) {
+                    final String topicNamePartition = 
topicName.getPartition(i).toString();
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    admin.topics().createSubscriptionAsync(topicNamePartition,
+                                    subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
+                        if (ex == null) {
+                            future.complete(null);
+                        } else {
+                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                            if (realCause instanceof 
PulsarAdminException.ConflictException) {
                                 future.complete(null);
                             } else {
-                                if (ignoreConflictException
-                                        && ex instanceof 
PulsarAdminException.ConflictException) {
-                                    future.complete(null);
-                                } else {
-                                    future.completeExceptionally(ex);
-                                }
+                                future.completeExceptionally(realCause);
                             }
-                        });
-                        subscriptionFutures.add(future);
-                    }
-                });
+                        }
+                    });
+                    subscriptionFutures.add(future);
+                }
+            });
 
-                FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
-                    log.info("[{}] Successfully created subscriptions on new 
partitions {}", clientAppId(), topicName);
-                    result.complete(null);
-                }).exceptionally(ex -> {
-                    log.warn("[{}] Failed to create subscriptions on new 
partitions for {}",
-                            clientAppId(), topicName, ex);
-                    result.completeExceptionally(ex);
-                    return null;
-                });
+            FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
+                log.info("[{}] Successfully created subscriptions on new 
partitions {}", clientAppId(), topicName);
+                result.complete(null);
             }).exceptionally(ex -> {
-                if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
-                    // The first partition doesn't exist, so there are 
currently to subscriptions to recreate
-                    result.complete(null);
-                } else {
-                    log.warn("[{}] Failed to get list of subscriptions of {}",
-                            clientAppId(), topicName.getPartition(0), ex);
-                    result.completeExceptionally(ex);
-                }
+                log.warn("[{}] Failed to create subscriptions on new 
partitions for {}",
+                        clientAppId(), topicName, ex);
+                result.completeExceptionally(ex);
                 return null;
             });
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to get partition metadata for {}",
-                    clientAppId(), topicName.toString());
-            result.completeExceptionally(ex);
+            if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
+                // The first partition doesn't exist, so there are currently 
to subscriptions to recreate
+                result.complete(null);
+            } else {
+                log.warn("[{}] Failed to get list of subscriptions of {}",
+                        clientAppId(), topicName.getPartition(0), ex);
+                result.completeExceptionally(ex);
+            }
             return null;
         });
         return result;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ee0ad6e103b..65a5795d59e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1017,11 +1017,22 @@ public class BrokerService implements Closeable {
             }
             final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
             if (isPersistentTopic) {
-                return topics.computeIfAbsent(topicName.toString(), (k) -> {
-                    return this.loadOrCreatePersistentTopic(k, 
createIfMissing, properties);
+                return topics.computeIfAbsent(topicName.toString(), (tpName) 
-> {
+                    if (topicName.isPartitioned()) {
+                        return 
fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
+                                .thenCompose((metadata) -> {
+                                    // Allow crate non-partitioned persistent 
topic that name includes `partition`
+                                    if (metadata.partitions == 0
+                                            || topicName.getPartitionIndex() < 
metadata.partitions) {
+                                        return 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties);
+                                    }
+                                    return 
CompletableFuture.completedFuture(Optional.empty());
+                                });
+                    }
+                    return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties);
                 });
             } else {
-                return topics.computeIfAbsent(topicName.toString(), (name) -> {
+            return topics.computeIfAbsent(topicName.toString(), (name) -> {
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
                         return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
@@ -1035,7 +1046,7 @@ public class BrokerService implements Closeable {
                     } else {
                         return 
CompletableFuture.completedFuture(Optional.empty());
                     }
-                });
+                    });
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 11c84d990f6..e35e9311b9f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2676,15 +2676,13 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 startPartitions);
 
         // create a subscription for few new partition which can fail
-        admin.topics().createSubscription(partitionedTopicName + "-partition-" 
+ startPartitions, subName1,
-                MessageId.earliest);
-
         try {
-            admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, false);
-        } catch (PulsarAdminException.PreconditionFailedException e) {
-            // Ok
+            admin.topics().createSubscription(partitionedTopicName + 
"-partition-" + startPartitions, subName1,
+                    MessageId.earliest);
+            fail("Unexpected behaviour");
+        } catch (PulsarAdminException.PreconditionFailedException ex) {
+            // OK
         }
-        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 startPartitions);
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, true);
         // validate subscription is created for new partition.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 71caa1edb52..73a1084f30f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -32,6 +37,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 @Test(groups = "broker")
 public class NonPersistentTopicTest extends BrokerTestBase {
@@ -96,4 +102,21 @@ public class NonPersistentTopicTest extends BrokerTestBase {
         assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), 
statsBeforeUnsubscribe.getBytesOutCounter());
         assertEquals(statsAfterUnsubscribe.getMsgOutCounter(), 
statsBeforeUnsubscribe.getMsgOutCounter());
     }
+
+    @Test
+    public void testCreateNonExistentPartitions() throws PulsarAdminException, 
PulsarClientException {
+        final String topicName = 
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
+        admin.topics().createPartitionedTopic(topicName, 4);
+        TopicName partition = TopicName.get(topicName).getPartition(4);
+        try {
+            @Cleanup
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                    .topic(partition.toString())
+                    .create();
+            fail("unexpected behaviour");
+        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+
+        }
+        
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index aa05624a5b0..19c5bd5c9aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -46,6 +47,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -402,4 +404,22 @@ public class PersistentTopicTest extends BrokerTestBase {
         assertTrue(persistentSubscription.getCursor().getLastActive() > 
beforeRemoveConsumerTimestamp);
         assertTrue(persistentSubscription2.getCursor().getLastActive() > 
beforeRemoveConsumerTimestamp);
     }
+
+
+    @Test
+    public void testCreateNonExistentPartitions() throws PulsarAdminException, 
PulsarClientException {
+        final String topicName = 
"persistent://prop/ns-abc/testCreateNonExistentPartitions";
+        admin.topics().createPartitionedTopic(topicName, 4);
+        TopicName partition = TopicName.get(topicName).getPartition(4);
+        try {
+            @Cleanup
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                    .topic(partition.toString())
+                    .create();
+            fail("unexpected behaviour");
+        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+
+        }
+        
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
+    }
 }

Reply via email to