merlimat closed pull request #2959: Fixed race condition in schema
initialization in partitioned topics
URL: https://github.com/apache/pulsar/pull/2959
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b8d9a27fe7..f0e9699dd4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,11 +53,16 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BookkeeperSchemaStorage implements SchemaStorage {
+ private static final Logger log =
LoggerFactory.getLogger(BookkeeperSchemaStorage.class);
+
private static final String SchemaPath = "/schemas";
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static final byte[] LedgerPassword = "".getBytes();
@@ -68,9 +73,6 @@
private final ServiceConfiguration config;
private BookKeeper bookKeeper;
-
- private final ConcurrentMap<String, CompletableFuture<LocatorEntry>>
locatorEntries = new ConcurrentHashMap<>();
-
private final ConcurrentMap<String, CompletableFuture<StoredSchema>>
readSchemaOperations = new ConcurrentHashMap<>();
@VisibleForTesting
@@ -124,9 +126,15 @@ public void start() throws IOException {
private CompletableFuture<StoredSchema> getSchema(String schemaId) {
// There's already a schema read operation in progress. Just piggyback
on that
return readSchemaOperations.computeIfAbsent(schemaId, key -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Fetching schema from store", schemaId);
+ }
CompletableFuture<StoredSchema> future = new CompletableFuture<>();
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got schema locator {}", schemaId, locator);
+ }
if (!locator.isPresent()) {
return completedFuture(null);
}
@@ -136,6 +144,10 @@ public void start() throws IOException {
.thenApply(entry -> new
StoredSchema(entry.getSchemaData().toByteArray(),
new
LongSchemaVersion(schemaLocator.getInfo().getVersion())));
}).handleAsync((res, ex) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get operation completed. res={} -- ex={}",
schemaId, res, ex);
+ }
+
// Cleanup the pending ops from the map
readSchemaOperations.remove(schemaId, future);
if (ex != null) {
@@ -165,7 +177,14 @@ public void close() throws Exception {
@NotNull
private CompletableFuture<StoredSchema> getSchema(String schemaId, long
version) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get schema - version: {}", schemaId, version);
+ }
+
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator
-> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get schema - version: {} - locator: {}",
schemaId, version, locator);
+ }
if (!locator.isPresent()) {
return completedFuture(null);
@@ -188,29 +207,116 @@ public void close() throws Exception {
@NotNull
private CompletableFuture<Long> putSchema(String schemaId, byte[] data,
byte[] hash) {
- return
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry ->
- addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(),
data).thenCompose(position ->
- updateSchemaLocator(schemaId, locatorEntry, position, hash)
- )
- );
+ return
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
+ if (optLocatorEntry.isPresent()) {
+ // Schema locator was already present
+ return
addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(), data)
+ .thenCompose(position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash));
+ } else {
+ // No schema was defined yet
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ createNewSchema(schemaId, data, hash)
+ .thenAccept(version -> future.complete(version))
+ .exceptionally(ex -> {
+ if (ex.getCause() instanceof NodeExistsException) {
+ // There was a race condition on the schema
creation. Since it has now been created,
+ // retry the whole operation so that we have a
chance to recover without bubbling error
+ // back to producer/consumer
+ putSchema(schemaId, data, hash)
+ .thenAccept(version ->
future.complete(version))
+ .exceptionally(ex2 -> {
+ future.completeExceptionally(ex2);
+ return null;
+ });
+ } else {
+ // For other errors, just fail the operation
+ future.completeExceptionally(ex);
+ }
+
+ return null;
+ });
+
+ return future;
+ }
+ });
}
@NotNull
private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[]
data, byte[] hash) {
- return
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry -> {
- byte[] storedHash =
locatorEntry.locator.getInfo().getHash().toByteArray();
- if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
- return
completedFuture(locatorEntry.locator.getInfo().getVersion());
- }
- return findSchemaEntryByHash(locatorEntry.locator.getIndexList(),
hash).thenCompose(version -> {
- if (isNull(version)) {
- return
addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(),
data).thenCompose(position ->
- updateSchemaLocator(schemaId, locatorEntry, position,
hash)
- );
- } else {
- return completedFuture(version);
+ return
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
+
+ if (optLocatorEntry.isPresent()) {
+ // Schema locator was already present
+ SchemaStorageFormat.SchemaLocator locator =
optLocatorEntry.get().locator;
+ byte[] storedHash = locator.getInfo().getHash().toByteArray();
+ if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
+ return completedFuture(locator.getInfo().getVersion());
}
- });
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] findSchemaEntryByHash - hash={}",
schemaId, hash);
+ }
+
+ return findSchemaEntryByHash(locator.getIndexList(),
hash).thenCompose(version -> {
+ if (isNull(version)) {
+ return
addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
+ position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash));
+ } else {
+ return completedFuture(version);
+ }
+ });
+ } else {
+ // No schema was defined yet
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ createNewSchema(schemaId, data, hash)
+ .thenAccept(version -> future.complete(version))
+ .exceptionally(ex -> {
+ if (ex.getCause() instanceof NodeExistsException) {
+ // There was a race condition on the schema
creation. Since it has now been created,
+ // retry the whole operation so that we have a
chance to recover without bubbling error
+ // back to producer/consumer
+ putSchemaIfAbsent(schemaId, data, hash)
+ .thenAccept(version ->
future.complete(version))
+ .exceptionally(ex2 -> {
+ future.completeExceptionally(ex2);
+ return null;
+ });
+ } else {
+ // For other errors, just fail the operation
+ future.completeExceptionally(ex);
+ }
+
+ return null;
+ });
+
+ return future;
+ }
+ });
+ }
+
+ private CompletableFuture<Long> createNewSchema(String schemaId, byte[]
data, byte[] hash) {
+ SchemaStorageFormat.IndexEntry emptyIndex =
SchemaStorageFormat.IndexEntry.newBuilder()
+ .setVersion(0)
+ .setHash(copyFrom(hash))
+
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
+ .setEntryId(-1L)
+ .setLedgerId(-1L)
+ ).build();
+
+ return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex),
data).thenCompose(position -> {
+ // The schema was stored in the ledger, now update the z-node with
the pointer to it
+ SchemaStorageFormat.IndexEntry info =
SchemaStorageFormat.IndexEntry.newBuilder()
+ .setVersion(0)
+ .setPosition(position)
+ .setHash(copyFrom(hash))
+ .build();
+
+ return createSchemaLocator(getSchemaPath(schemaId),
SchemaStorageFormat.SchemaLocator.newBuilder()
+ .setInfo(info)
+ .addAllIndex(
+ newArrayList(info))
+ .build())
+ .thenApply(ignore -> 0L);
});
}
@@ -226,7 +332,7 @@ public void close() throws Exception {
}
@NotNull
- private String getSchemaPath(String schemaId) {
+ private static String getSchemaPath(String schemaId) {
return SchemaPath + "/" + schemaId;
}
@@ -310,8 +416,12 @@ private String getSchemaPath(String schemaId) {
}
}
- return readSchemaEntry(index.get(0).getPosition())
- .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(),
hash));
+ if (index.get(0).getPosition().getLedgerId() == -1) {
+ return completedFuture(null);
+ } else {
+ return readSchemaEntry(index.get(0).getPosition())
+ .thenCompose(entry ->
findSchemaEntryByHash(entry.getIndexList(), hash));
+ }
}
@@ -319,6 +429,10 @@ private String getSchemaPath(String schemaId) {
private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
SchemaStorageFormat.PositionInfo position
) {
+ if (log.isDebugEnabled()) {
+ log.debug("Reading schema entry from {}", position);
+ }
+
return openLedger(position.getLedgerId())
.thenCompose((ledger) ->
Functions.getLedgerEntry(ledger, position.getEntryId())
@@ -342,6 +456,24 @@ private String getSchemaPath(String schemaId) {
return future;
}
+ @NotNull
+ private CompletableFuture<LocatorEntry> createSchemaLocator(String id,
SchemaStorageFormat.SchemaLocator locator) {
+ CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
+
+ ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id,
locator.toByteArray(), Acl,
+ CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
+ Code code = Code.get(rc);
+ if (code != Code.OK) {
+
future.completeExceptionally(KeeperException.create(code));
+ } else {
+ // Newly created z-node will have version 0
+ future.complete(new LocatorEntry(locator, 0));
+ }
+ }, null);
+
+ return future;
+ }
+
@NotNull
private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String
schema) {
return localZkCache.getEntryAsync(schema, new
SchemaLocatorDeserializer()).thenApply(optional ->
@@ -349,58 +481,13 @@ private String getSchemaPath(String schemaId) {
);
}
- @NotNull
- private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String
schema) {
- // Protect from concurrent schema locator creation
- return locatorEntries.computeIfAbsent(schema, key -> {
- CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
-
- getSchemaLocator(schema).thenCompose(schemaLocatorStatEntry -> {
- if (schemaLocatorStatEntry.isPresent()) {
- return completedFuture(schemaLocatorStatEntry.get());
- } else {
- SchemaStorageFormat.SchemaLocator locator =
SchemaStorageFormat.SchemaLocator.newBuilder()
-
.setInfo(SchemaStorageFormat.IndexEntry.newBuilder().setVersion(-1L)
-
.setHash(ByteString.EMPTY).setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
- .setEntryId(-1L).setLedgerId(-1L)))
- .build();
-
- CompletableFuture<LocatorEntry> zkFuture = new
CompletableFuture<>();
-
- ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, schema,
locator.toByteArray(), Acl,
- CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
- Code code = Code.get(rc);
- if (code != Code.OK) {
-
zkFuture.completeExceptionally(KeeperException.create(code));
- } else {
- zkFuture.complete(new
LocatorEntry(locator, -1));
- }
- }, null);
-
- return zkFuture;
- }
- }).handleAsync((res, ex) -> {
- // Cleanup the pending ops from the map
- locatorEntries.remove(schema, future);
- if (ex != null) {
- future.completeExceptionally(ex);
- } else {
- future.complete(res);
- }
- return null;
- });
-
- return future;
- });
- }
-
@NotNull
private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle,
SchemaStorageFormat.SchemaEntry entry) {
final CompletableFuture<Long> future = new CompletableFuture<>();
ledgerHandle.asyncAddEntry(entry.toByteArray(),
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(BKException.create(rc));
+ future.completeExceptionally(bkException("Failed to add
entry", rc, ledgerHandle.getId(), -1));
} else {
future.complete(entryId);
}
@@ -420,7 +507,7 @@ private String getSchemaPath(String schemaId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(BKException.create(rc));
+ future.completeExceptionally(bkException("Failed to create
ledger", rc, -1, -1));
} else {
future.complete(handle);
}
@@ -438,7 +525,7 @@ private String getSchemaPath(String schemaId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(BKException.create(rc));
+ future.completeExceptionally(bkException("Failed to open
ledger", rc, ledgerId, -1));
} else {
future.complete(handle);
}
@@ -452,7 +539,7 @@ private String getSchemaPath(String schemaId) {
CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(BKException.create(rc));
+ future.completeExceptionally(bkException("Failed to close
ledger", rc, ledgerHandle.getId(), -1));
} else {
future.complete(null);
}
@@ -466,7 +553,7 @@ private String getSchemaPath(String schemaId) {
ledger.asyncReadEntries(entry, entry,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(BKException.create(rc));
+ future.completeExceptionally(bkException("Failed to
read entry", rc, ledger.getId(), entry));
} else {
future.complete(entries.nextElement());
}
@@ -519,4 +606,13 @@ private String getSchemaPath(String schemaId) {
this.zkZnodeVersion = zkZnodeVersion;
}
}
+
+ public static Exception bkException(String operation, int rc, long
ledgerId, long entryId) {
+ String message =
org.apache.bookkeeper.client.api.BKException.getMessage(rc) + " - ledger=" +
ledgerId;
+
+ if (entryId != -1) {
+ message += " - entry=" + entryId;
+ }
+ return new IOException(message);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
index e7723d7754..6e9b1220a5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
@@ -38,10 +38,8 @@
/**
* Test that sequence id from a producer is correct when there are send
errors
- *
- * the test is disabled {@link
https://github.com/apache/pulsar/issues/2651}
*/
- @Test(enabled = false)
+ @Test
public void partitionedTopicWithSchema() throws Exception {
admin.namespaces().createNamespace("prop/my-test",
Collections.singleton("usc"));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index a6f40fe1c3..3b87e4e086 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -126,15 +126,15 @@ private void start() {
if (completed.incrementAndGet() ==
topicMetadata.numPartitions()) {
if (createFail.get() == null) {
setState(State.Ready);
-
producerCreatedFuture().complete(PartitionedProducerImpl.this);
log.info("[{}] Created partitioned producer", topic);
+
producerCreatedFuture().complete(PartitionedProducerImpl.this);
} else {
+ log.error("[{}] Could not create partitioned
producer.", topic, createFail.get().getCause());
closeAsync().handle((ok, closeException) -> {
producerCreatedFuture().completeExceptionally(createFail.get());
client.cleanupProducer(this);
return null;
});
- log.error("[{}] Could not create partitioned
producer.", topic, createFail.get().getCause());
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services