This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2ad89f4 Make sure schema is initialized before the topic is loaded
(#2203)
2ad89f4 is described below
commit 2ad89f475a0feced7242ce9de2ec2234dc24d1c7
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jul 24 22:03:43 2018 -0700
Make sure schema is initialized before the topic is loaded (#2203)
* Make sure schema is initialized before the topic is loaded
* Added overloaded method
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../broker/admin/impl/PersistentTopicsBase.java | 4 +--
.../pulsar/broker/service/BrokerService.java | 41 +++++++++++++++++-----
.../apache/pulsar/broker/service/ServerCnx.java | 4 +--
.../service/nonpersistent/NonPersistentTopic.java | 8 +++--
.../broker/service/persistent/PersistentTopic.java | 4 +++
.../broker/service/BrokerBkEnsemblesTests.java | 6 ++--
.../pulsar/broker/service/BrokerServiceTest.java | 11 +++---
8 files changed, 56 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c0aa9d2..8a811ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -563,7 +563,7 @@ public class PulsarService implements AutoCloseable {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
- CompletableFuture<Topic> future =
brokerService.getOrCreateTopic(topic);
+ CompletableFuture<Topic> future =
brokerService.getOrCreateTopic(topic, null);
if (future != null) {
persistentTopics.add(future);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 946980a..cf8355f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -486,7 +486,7 @@ public class PersistentTopicsBase extends AdminResource {
internalDeleteTopic(authoritative);
}
}
-
+
protected void internalDeleteTopic(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
@@ -1191,7 +1191,7 @@ public class PersistentTopicsBase extends AdminResource {
}
private Topic getOrCreateTopic(TopicName topicName) {
- return
pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
+ return
pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), null).join();
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4bbfc72..182f3cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -112,6 +112,7 @@ import
org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
@@ -450,14 +451,19 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
public CompletableFuture<Optional<Topic>> getTopicIfExists(final String
topic) {
- return getTopic(topic, false /* createIfMissing */ );
+ return getTopic(topic, false /* createIfMissing */, null /* schemaData
*/ );
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
- return getTopic(topic, true /* createIfMissing */
).thenApply(Optional::get);
+ return getOrCreateTopic(topic, null);
}
- private CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing) {
+ public CompletableFuture<Topic> getOrCreateTopic(final String topic,
SchemaData schemaData) {
+ return getTopic(topic, true /* createIfMissing */, schemaData
).thenApply(Optional::get);
+ }
+
+ private CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing,
+ SchemaData schemaData) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
@@ -471,8 +477,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
final boolean isPersistentTopic =
TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
return topics.computeIfAbsent(topic, (topicName) -> {
- return isPersistentTopic ?
this.loadOrCreatePersistentTopic(topicName, createIfMissing)
- : createNonPersistentTopic(topicName);
+ return isPersistentTopic ?
this.loadOrCreatePersistentTopic(topicName, createIfMissing, schemaData)
+ : createNonPersistentTopic(topicName, schemaData);
});
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic",
topic, e);
@@ -489,7 +495,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
}
- private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
+ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic, SchemaData schemaData) {
CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
@@ -520,7 +526,15 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
return null;
});
- return topicFuture;
+ return topicFuture.thenCompose(ot -> {
+ if (ot.isPresent()) {
+ // If a schema is provided, add or validate it before the
+ // topic is "visible"
+ return ot.get().addSchema(schemaData).thenApply(schemaVersion
-> ot);
+ } else {
+ return CompletableFuture.completedFuture(ot);
+ }
+ });
}
private static <T> CompletableFuture<T> failedFuture(Throwable t) {
@@ -577,7 +591,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
* @return CompletableFuture<Topic>
* @throws RuntimeException
*/
- protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws
RuntimeException {
+ protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic,
+ boolean createIfMissing, SchemaData schemaData) throws
RuntimeException {
checkTopicNsOwnership(topic);
final CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
@@ -605,7 +620,15 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
log.debug("topic-loading for {} added into pending queue",
topic);
}
}
- return topicFuture;
+ return topicFuture.thenCompose(ot -> {
+ if (ot.isPresent()) {
+ // If a schema is provided, add or validate it before the
+ // topic is "visible"
+ return ot.get().addSchema(schemaData).thenApply(schemaVersion
-> ot);
+ } else {
+ return CompletableFuture.completedFuture(ot);
+ }
+ });
}
private void createPersistentTopic(final String topic, boolean
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1d191bc..aacd7bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -584,7 +584,7 @@ public class ServerCnx extends PulsarHandler {
}
}
- service.getOrCreateTopic(topicName.toString())
+ service.getOrCreateTopic(topicName.toString(), schema)
.thenCompose(topic -> {
if (schema != null) {
return
topic.isSchemaCompatible(schema).thenCompose(isCompatible -> {
@@ -791,7 +791,7 @@ public class ServerCnx extends PulsarHandler {
log.info("[{}][{}] Creating producer. producerId={}",
remoteAddress, topicName, producerId);
-
service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
+ service.getOrCreateTopic(topicName.toString(),
schema).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog
quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index de55df3..e13549e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -415,14 +415,14 @@ public class NonPersistentTopic implements Topic {
/**
* Forcefully close all producers/consumers/replicators and deletes the
topic.
- *
+ *
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true);
}
-
+
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
@@ -1016,6 +1016,10 @@ public class NonPersistentTopic implements Topic {
@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ if (schema == null) {
+ return CompletableFuture.completedFuture(SchemaVersion.Empty);
+ }
+
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c66e10b..4d289c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1792,6 +1792,10 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ if (schema == null) {
+ return CompletableFuture.completedFuture(SchemaVersion.Empty);
+ }
+
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 4ecdae4..ce023c3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -171,7 +171,7 @@ public class BrokerBkEnsemblesTests {
consumer.acknowledge(msg);
}
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
ManagedCursorImpl cursor = (ManagedCursorImpl)
topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);
@@ -206,7 +206,7 @@ public class BrokerBkEnsemblesTests {
}
// (5) Broker should create new cursor-ledger and remove old
cursor-ledger
- topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+ topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
final ManagedCursorImpl cursor1 = (ManagedCursorImpl)
topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor1.getState().equals("Open"), 5,
100);
long newCursorLedgerId = cursor1.getCursorLedger();
@@ -254,7 +254,7 @@ public class BrokerBkEnsemblesTests {
Consumer<byte[]> consumer =
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9fa40d4..b9b1b84 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -105,7 +105,7 @@ public class BrokerServiceTest extends BrokerTestBase {
BrokerService service = pulsar.getBrokerService();
final CountDownLatch latch1 = new CountDownLatch(1);
- service.getOrCreateTopic(topic).thenAccept(t -> {
+ service.getOrCreateTopic(topic, null).thenAccept(t -> {
latch1.countDown();
fail("should fail as NS is not owned");
}).exceptionally(exception -> {
@@ -118,7 +118,7 @@ public class BrokerServiceTest extends BrokerTestBase {
admin.lookups().lookupTopic(topic);
final CountDownLatch latch2 = new CountDownLatch(1);
- service.getOrCreateTopic(topic).thenAccept(t -> {
+ service.getOrCreateTopic(topic, null).thenAccept(t -> {
try {
assertNotNull(service.getTopicReference(topic));
} catch (Exception e) {
@@ -746,7 +746,8 @@ public class BrokerServiceTest extends BrokerTestBase {
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle,
false);
// try to create topic which should fail as bundle is disable
- CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+ CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
+ .loadOrCreatePersistentTopic(topicName, true, null);
try {
futureResult.get();
@@ -789,7 +790,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// create topic async and wait on the future completion
executor.submit(() -> {
- service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic ->
topicCreation.complete(null)).exceptionally(e -> {
+ service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic
-> topicCreation.complete(null)).exceptionally(e -> {
topicCreation.completeExceptionally(e.getCause());
return null;
});
@@ -841,7 +842,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// create topic async and wait on the future completion
executor.submit(() -> {
- service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic ->
topicCreation.complete(null)).exceptionally(e -> {
+ service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic
-> topicCreation.complete(null)).exceptionally(e -> {
topicCreation.completeExceptionally(e.getCause());
return null;
});