This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0896eda18c1 [fix][schema] Fix cherry-pick issue from #18283 (#18555)
0896eda18c1 is described below
commit 0896eda18c11fefce4cc5e18f77d8c49d6a646b4
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Nov 21 15:31:19 2022 +0800
[fix][schema] Fix cherry-pick issue from #18283 (#18555)
---
.../broker/service/persistent/PersistentTopic.java | 3 +-
.../service/persistent/PersistentTopicTest.java | 72 +++++++++++++++++++++-
2 files changed, 71 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4c4c68ab9ff..df90547c723 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1179,7 +1179,8 @@ public class PersistentTopic extends AbstractTopic
closeClientFuture.thenAccept(delete -> {
CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
- deleteTopicAuthenticationFuture.thenCompose(__ ->
deleteSchema())
+ deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema
? deleteSchema() :
+ CompletableFuture.completedFuture(null))
.thenCompose(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 18eddcb7465..2c48ae183df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,14 +29,13 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-
+import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Sets;
+import lombok.Data;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +51,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -271,4 +272,69 @@ public class PersistentTopicTest extends BrokerTestBase {
producer.close();
}
}
+
+ @Test
+ public void testCreateSchemaAfterDeletion() throws Exception {
+ //init namespace
+ final String myNamespace = "prop/ns";
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topicName =
"persistent://prop/ns/test-create-schema-after-deletion" + UUID.randomUUID();
+
+ // create namespace
+ // Create a topic with `Person`
+ try (Producer<Person> producer =
pulsarClient.newProducer(Schema.AVRO(Person.class))
+ .topic(topicName)
+ .create()
+ ) {
+ Person person = new Person();
+ person.setName("Tom Hanks");
+ person.setAge(60);
+
+ producer.send(person);
+
+ }
+
+ // delete the topic
+ admin.topics().delete(topicName);
+
+ try (Producer<Student> ignored =
pulsarClient.newProducer(Schema.AVRO(Student.class))
+ .topic(topicName)
+ .create()) {
+ Assert.fail("Should fail to create a the producer with a new
schema since the schema is not deleted.");
+ } catch (PulsarClientException pce) {
+ Assert.assertTrue(pce instanceof
PulsarClientException.IncompatibleSchemaException);
+ }
+
+ // delete the schema
+ admin.schemas().deleteSchema(topicName);
+
+ // after deleting the schema, try to create a topic with a different
schema
+ try (Producer<Student> producer =
pulsarClient.newProducer(Schema.AVRO(Student.class))
+ .topic(topicName)
+ .create()
+ ) {
+ Student student = new Student();
+ student.setName("Tom Jerry");
+ student.setAge(30);
+ student.setGpa(10);
+
+ producer.send(student);
+
+ }
+ }
+
+ @Data
+ public static class Student {
+ private String name;
+ private int age;
+ private int gpa;
+ private int grade;
+
+ }
+
+ @Data
+ public static class Person {
+ private String name;
+ private int age;
+ }
}