This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9343a35255c [fix][schema] Fix cherry-pick issue from #18283 (#18555)
9343a35255c is described below

commit 9343a35255c4f27fa8551d0a96f7b62ede4d6e46
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Nov 21 15:31:19 2022 +0800

    [fix][schema] Fix cherry-pick issue from #18283 (#18555)
---
 .../broker/service/persistent/PersistentTopic.java |  3 +-
 .../service/persistent/PersistentTopicTest.java    | 72 +++++++++++++++++++++-
 2 files changed, 71 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8c10f6ca20c..ba5a11444a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1181,7 +1181,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             closeClientFuture.thenAccept(delete -> {
                 CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
                 brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-                deleteTopicAuthenticationFuture.thenCompose(__ -> 
deleteSchema())
+                deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema 
? deleteSchema() :
+                                CompletableFuture.completedFuture(null))
                         .thenCompose(__ -> deleteTopicPolicies())
                         .thenCompose(__ -> transactionBufferCleanupAndClose())
                         .whenComplete((v, ex) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index b9069417704..056208c6798 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,14 +29,13 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-
+import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Sets;
+import lombok.Data;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +51,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -271,4 +272,69 @@ public class PersistentTopicTest extends BrokerTestBase {
             producer.close();
         }
     }
+
+    @Test
+    public void testCreateSchemaAfterDeletion() throws Exception {
+        //init namespace
+        final String myNamespace = "prop/ns";
+        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+        final String topicName = 
"persistent://prop/ns/test-create-schema-after-deletion" + UUID.randomUUID();
+
+        // create namespace
+        // Create a topic with `Person`
+        try (Producer<Person> producer = 
pulsarClient.newProducer(Schema.AVRO(Person.class))
+                .topic(topicName)
+                .create()
+        ) {
+            Person person = new Person();
+            person.setName("Tom Hanks");
+            person.setAge(60);
+
+            producer.send(person);
+
+        }
+
+        // delete the topic
+        admin.topics().delete(topicName);
+
+        try (Producer<Student> ignored = 
pulsarClient.newProducer(Schema.AVRO(Student.class))
+                .topic(topicName)
+                .create()) {
+            Assert.fail("Should fail to create a the producer with a new 
schema since the schema is not deleted.");
+        } catch (PulsarClientException pce) {
+            Assert.assertTrue(pce instanceof 
PulsarClientException.IncompatibleSchemaException);
+        }
+
+        // delete the schema
+        admin.schemas().deleteSchema(topicName);
+
+        // after deleting the schema, try to create a topic with a different 
schema
+        try (Producer<Student> producer = 
pulsarClient.newProducer(Schema.AVRO(Student.class))
+                .topic(topicName)
+                .create()
+        ) {
+            Student student = new Student();
+            student.setName("Tom Jerry");
+            student.setAge(30);
+            student.setGpa(10);
+
+            producer.send(student);
+
+        }
+    }
+
+    @Data
+    public static class Student {
+        private String name;
+        private int age;
+        private int gpa;
+        private int grade;
+
+    }
+
+    @Data
+    public static class Person {
+        private String name;
+        private int age;
+    }
 }

Reply via email to