merlimat closed pull request #2203: Make sure schema is initialized before the 
topic is loaded
URL: https://github.com/apache/incubator-pulsar/pull/2203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a341afba7..bbc3cfd77b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -563,7 +563,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName)) {
-                        CompletableFuture<Topic> future = 
brokerService.getOrCreateTopic(topic);
+                        CompletableFuture<Topic> future = 
brokerService.getOrCreateTopic(topic, null);
                         if (future != null) {
                             persistentTopics.add(future);
                         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 946980a2bf..cf8355f4c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -486,7 +486,7 @@ protected void internalDeleteTopic(boolean authoritative, 
boolean force) {
             internalDeleteTopic(authoritative);
         }
     }
-    
+
     protected void internalDeleteTopic(boolean authoritative) {
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
@@ -1191,7 +1191,7 @@ private Topic getTopicReference(TopicName topicName) {
     }
 
     private Topic getOrCreateTopic(TopicName topicName) {
-        return 
pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
+        return 
pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), null).join();
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 741afbfefb..2f08137d65 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -112,6 +112,7 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -450,14 +451,19 @@ public void unloadNamespaceBundlesGracefully() {
     }
 
     public CompletableFuture<Optional<Topic>> getTopicIfExists(final String 
topic) {
-        return getTopic(topic, false /* createIfMissing */ );
+        return getTopic(topic, false /* createIfMissing */, null /* schemaData 
*/ );
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, true /* createIfMissing */ 
).thenApply(Optional::get);
+        return getOrCreateTopic(topic, null);
     }
 
-    private CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing) {
+    public CompletableFuture<Topic> getOrCreateTopic(final String topic, 
SchemaData schemaData) {
+        return getTopic(topic, true /* createIfMissing */, schemaData 
).thenApply(Optional::get);
+    }
+
+    private CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing,
+            SchemaData schemaData) {
         try {
             CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
             if (topicFuture != null) {
@@ -471,8 +477,8 @@ public void unloadNamespaceBundlesGracefully() {
             }
             final boolean isPersistentTopic = 
TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
             return topics.computeIfAbsent(topic, (topicName) -> {
-                return isPersistentTopic ? 
this.loadOrCreatePersistentTopic(topicName, createIfMissing)
-                        : createNonPersistentTopic(topicName);
+                return isPersistentTopic ? 
this.loadOrCreatePersistentTopic(topicName, createIfMissing, schemaData)
+                        : createNonPersistentTopic(topicName, schemaData);
             });
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topic, e);
@@ -489,7 +495,7 @@ public void unloadNamespaceBundlesGracefully() {
         }
     }
 
-    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
+    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic, SchemaData schemaData) {
         CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
@@ -520,7 +526,15 @@ public void unloadNamespaceBundlesGracefully() {
             return null;
         });
 
-        return topicFuture;
+        return topicFuture.thenCompose(ot -> {
+            if (ot.isPresent()) {
+                // If a schema is provided, add or validate it before the
+                // topic is "visible"
+                return ot.get().addSchema(schemaData).thenApply(schemaVersion 
-> ot);
+            } else {
+                return CompletableFuture.completedFuture(ot);
+            }
+        });
     }
 
     private static <T> CompletableFuture<T> failedFuture(Throwable t) {
@@ -577,7 +591,8 @@ public PulsarClient getReplicationClient(String cluster) {
      * @return CompletableFuture<Topic>
      * @throws RuntimeException
      */
-    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws 
RuntimeException {
+    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic,
+            boolean createIfMissing, SchemaData schemaData) throws 
RuntimeException {
         checkTopicNsOwnership(topic);
 
         final CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
@@ -605,7 +620,15 @@ public PulsarClient getReplicationClient(String cluster) {
                 log.debug("topic-loading for {} added into pending queue", 
topic);
             }
         }
-        return topicFuture;
+        return topicFuture.thenCompose(ot -> {
+            if (ot.isPresent()) {
+                // If a schema is provided, add or validate it before the
+                // topic is "visible"
+                return ot.get().addSchema(schemaData).thenApply(schemaVersion 
-> ot);
+            } else {
+                return CompletableFuture.completedFuture(ot);
+            }
+        });
     }
 
     private void createPersistentTopic(final String topic, boolean 
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b45f72c16..f539bd6a25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -301,7 +301,7 @@ protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
         if (topicName == null) {
             return;
         }
-        
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -578,7 +578,7 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                             }
                         }
 
-                        service.getOrCreateTopic(topicName.toString())
+                        service.getOrCreateTopic(topicName.toString(), schema)
                                 .thenCompose(topic -> {
                                     if (schema != null) {
                                         return 
topic.isSchemaCompatible(schema).thenCompose(isCompatible -> {
@@ -785,7 +785,7 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
 
                         log.info("[{}][{}] Creating producer. producerId={}", 
remoteAddress, topicName, producerId);
 
-                        
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
+                        service.getOrCreateTopic(topicName.toString(), 
schema).thenAccept((Topic topic) -> {
                             // Before creating producer, check if backlog 
quota exceeded
                             // on topic
                             if (topic.isBacklogQuotaExceeded(producerName)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index de55df3eeb..e13549e7a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -415,14 +415,14 @@ void removeSubscription(String subscriptionName) {
 
     /**
      * Forcefully close all producers/consumers/replicators and deletes the 
topic.
-     * 
+     *
      * @return
      */
     @Override
     public CompletableFuture<Void> deleteForcefully() {
         return delete(false, true);
     }
-    
+
     private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, 
boolean closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
@@ -1016,6 +1016,10 @@ public void markBatchMessagePublished() {
 
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        if (schema == null) {
+            return CompletableFuture.completedFuture(SchemaVersion.Empty);
+        }
+
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c66e10bbc3..4d289c4260 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1792,6 +1792,10 @@ public synchronized OffloadProcessStatus offloadStatus() 
{
 
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        if (schema == null) {
+            return CompletableFuture.completedFuture(SchemaVersion.Empty);
+        }
+
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         return brokerService.pulsar()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 4ecdae497b..ce023c3bd6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -171,7 +171,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws 
Exception {
             consumer.acknowledge(msg);
         }
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);
 
@@ -206,7 +206,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws 
Exception {
         }
 
         // (5) Broker should create new cursor-ledger and remove old 
cursor-ledger
-        topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
         final ManagedCursorImpl cursor1 = (ManagedCursorImpl) 
topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 
100);
         long newCursorLedgerId = cursor1.getCursorLedger();
@@ -254,7 +254,7 @@ public void testSkipCorruptDataLedger() throws Exception {
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
                 .receiverQueueSize(5).subscribe();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().iterator().next();
         Field configField = ManagedCursorImpl.class.getDeclaredField("config");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9fa40d41a4..b9b1b847d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -105,7 +105,7 @@ public void testOwnedNsCheck() throws Exception {
         BrokerService service = pulsar.getBrokerService();
 
         final CountDownLatch latch1 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic).thenAccept(t -> {
+        service.getOrCreateTopic(topic, null).thenAccept(t -> {
             latch1.countDown();
             fail("should fail as NS is not owned");
         }).exceptionally(exception -> {
@@ -118,7 +118,7 @@ public void testOwnedNsCheck() throws Exception {
         admin.lookups().lookupTopic(topic);
 
         final CountDownLatch latch2 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic).thenAccept(t -> {
+        service.getOrCreateTopic(topic, null).thenAccept(t -> {
             try {
                 assertNotNull(service.getTopicReference(topic));
             } catch (Exception e) {
@@ -746,7 +746,8 @@ public void testTopicLoadingOnDisableNamespaceBundle() 
throws Exception {
         
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, 
false);
 
         // try to create topic which should fail as bundle is disable
-        CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+        CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
+                .loadOrCreatePersistentTopic(topicName, true, null);
 
         try {
             futureResult.get();
@@ -789,7 +790,7 @@ public void testTopicFailureShouldNotHaveDeadLock() {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> 
topicCreation.complete(null)).exceptionally(e -> {
+            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic 
-> topicCreation.complete(null)).exceptionally(e -> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });
@@ -841,7 +842,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() 
throws Exception {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> 
topicCreation.complete(null)).exceptionally(e -> {
+            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic 
-> topicCreation.complete(null)).exceptionally(e -> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to