merlimat closed pull request #2959: Fixed race condition in schema 
initialization in partitioned topics
URL: https://github.com/apache/pulsar/pull/2959
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b8d9a27fe7..f0e9699dd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,11 +53,16 @@
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BookkeeperSchemaStorage implements SchemaStorage {
+    private static final Logger log = 
LoggerFactory.getLogger(BookkeeperSchemaStorage.class);
+
     private static final String SchemaPath = "/schemas";
     private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
     private static final byte[] LedgerPassword = "".getBytes();
@@ -68,9 +73,6 @@
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-
-    private final ConcurrentMap<String, CompletableFuture<LocatorEntry>> 
locatorEntries = new ConcurrentHashMap<>();
-
     private final ConcurrentMap<String, CompletableFuture<StoredSchema>> 
readSchemaOperations = new ConcurrentHashMap<>();
 
     @VisibleForTesting
@@ -124,9 +126,15 @@ public void start() throws IOException {
     private CompletableFuture<StoredSchema> getSchema(String schemaId) {
         // There's already a schema read operation in progress. Just piggyback 
on that
         return readSchemaOperations.computeIfAbsent(schemaId, key -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Fetching schema from store", schemaId);
+            }
             CompletableFuture<StoredSchema> future = new CompletableFuture<>();
 
             getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Got schema locator {}", schemaId, locator);
+                }
                 if (!locator.isPresent()) {
                     return completedFuture(null);
                 }
@@ -136,6 +144,10 @@ public void start() throws IOException {
                         .thenApply(entry -> new 
StoredSchema(entry.getSchemaData().toByteArray(),
                                 new 
LongSchemaVersion(schemaLocator.getInfo().getVersion())));
             }).handleAsync((res, ex) -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Get operation completed. res={} -- ex={}", 
schemaId, res, ex);
+                }
+
                 // Cleanup the pending ops from the map
                 readSchemaOperations.remove(schemaId, future);
                 if (ex != null) {
@@ -165,7 +177,14 @@ public void close() throws Exception {
 
     @NotNull
     private CompletableFuture<StoredSchema> getSchema(String schemaId, long 
version) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Get schema - version: {}", schemaId, version);
+        }
+
         return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator 
-> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Get schema - version: {} - locator: {}", 
schemaId, version, locator);
+            }
 
             if (!locator.isPresent()) {
                 return completedFuture(null);
@@ -188,29 +207,116 @@ public void close() throws Exception {
 
     @NotNull
     private CompletableFuture<Long> putSchema(String schemaId, byte[] data, 
byte[] hash) {
-        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry ->
-            addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
-                updateSchemaLocator(schemaId, locatorEntry, position, hash)
-            )
-        );
+        return 
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
+            if (optLocatorEntry.isPresent()) {
+                // Schema locator was already present
+                return 
addNewSchemaEntryToStore(optLocatorEntry.get().locator.getIndexList(), data)
+                        .thenCompose(position -> updateSchemaLocator(schemaId, 
optLocatorEntry.get(), position, hash));
+            } else {
+                // No schema was defined yet
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                createNewSchema(schemaId, data, hash)
+                        .thenAccept(version -> future.complete(version))
+                        .exceptionally(ex -> {
+                            if (ex.getCause() instanceof NodeExistsException) {
+                                // There was a race condition on the schema 
creation. Since it has now been created,
+                                // retry the whole operation so that we have a 
chance to recover without bubbling error
+                                // back to producer/consumer
+                                putSchema(schemaId, data, hash)
+                                        .thenAccept(version -> 
future.complete(version))
+                                        .exceptionally(ex2 -> {
+                                            future.completeExceptionally(ex2);
+                                            return null;
+                                        });
+                            } else {
+                                // For other errors, just fail the operation
+                                future.completeExceptionally(ex);
+                            }
+
+                            return null;
+                        });
+
+                return future;
+            }
+        });
     }
 
     @NotNull
     private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] 
data, byte[] hash) {
-        return 
getOrCreateSchemaLocator(getSchemaPath(schemaId)).thenCompose(locatorEntry -> {
-            byte[] storedHash = 
locatorEntry.locator.getInfo().getHash().toByteArray();
-            if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
-                return 
completedFuture(locatorEntry.locator.getInfo().getVersion());
-            }
-            return findSchemaEntryByHash(locatorEntry.locator.getIndexList(), 
hash).thenCompose(version -> {
-                if (isNull(version)) {
-                    return 
addNewSchemaEntryToStore(locatorEntry.locator.getIndexList(), 
data).thenCompose(position ->
-                        updateSchemaLocator(schemaId, locatorEntry, position, 
hash)
-                    );
-                } else {
-                    return completedFuture(version);
+        return 
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
+
+            if (optLocatorEntry.isPresent()) {
+                // Schema locator was already present
+                SchemaStorageFormat.SchemaLocator locator = 
optLocatorEntry.get().locator;
+                byte[] storedHash = locator.getInfo().getHash().toByteArray();
+                if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
+                    return completedFuture(locator.getInfo().getVersion());
                 }
-            });
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] findSchemaEntryByHash - hash={}", 
schemaId, hash);
+                }
+
+                return findSchemaEntryByHash(locator.getIndexList(), 
hash).thenCompose(version -> {
+                    if (isNull(version)) {
+                        return 
addNewSchemaEntryToStore(locator.getIndexList(), data).thenCompose(
+                                position -> updateSchemaLocator(schemaId, 
optLocatorEntry.get(), position, hash));
+                    } else {
+                        return completedFuture(version);
+                    }
+                });
+            } else {
+                // No schema was defined yet
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                createNewSchema(schemaId, data, hash)
+                        .thenAccept(version -> future.complete(version))
+                        .exceptionally(ex -> {
+                            if (ex.getCause() instanceof NodeExistsException) {
+                                // There was a race condition on the schema 
creation. Since it has now been created,
+                                // retry the whole operation so that we have a 
chance to recover without bubbling error
+                                // back to producer/consumer
+                                putSchemaIfAbsent(schemaId, data, hash)
+                                        .thenAccept(version -> 
future.complete(version))
+                                        .exceptionally(ex2 -> {
+                                            future.completeExceptionally(ex2);
+                                            return null;
+                                        });
+                            } else {
+                                // For other errors, just fail the operation
+                                future.completeExceptionally(ex);
+                            }
+
+                            return null;
+                        });
+
+                return future;
+            }
+        });
+    }
+
+    private CompletableFuture<Long> createNewSchema(String schemaId, byte[] 
data, byte[] hash) {
+        SchemaStorageFormat.IndexEntry emptyIndex = 
SchemaStorageFormat.IndexEntry.newBuilder()
+                        .setVersion(0)
+                        .setHash(copyFrom(hash))
+                        
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
+                                .setEntryId(-1L)
+                                .setLedgerId(-1L)
+                        ).build();
+
+        return addNewSchemaEntryToStore(Collections.singletonList(emptyIndex), 
data).thenCompose(position -> {
+            // The schema was stored in the ledger, now update the z-node with 
the pointer to it
+            SchemaStorageFormat.IndexEntry info = 
SchemaStorageFormat.IndexEntry.newBuilder()
+                    .setVersion(0)
+                    .setPosition(position)
+                    .setHash(copyFrom(hash))
+                    .build();
+
+            return createSchemaLocator(getSchemaPath(schemaId), 
SchemaStorageFormat.SchemaLocator.newBuilder()
+                    .setInfo(info)
+                    .addAllIndex(
+                            newArrayList(info))
+                    .build())
+                            .thenApply(ignore -> 0L);
         });
     }
 
@@ -226,7 +332,7 @@ public void close() throws Exception {
     }
 
     @NotNull
-    private String getSchemaPath(String schemaId) {
+    private static String getSchemaPath(String schemaId) {
         return SchemaPath + "/" + schemaId;
     }
 
@@ -310,8 +416,12 @@ private String getSchemaPath(String schemaId) {
             }
         }
 
-        return readSchemaEntry(index.get(0).getPosition())
-            .thenCompose(entry -> findSchemaEntryByHash(entry.getIndexList(), 
hash));
+        if (index.get(0).getPosition().getLedgerId() == -1) {
+            return completedFuture(null);
+        } else {
+            return readSchemaEntry(index.get(0).getPosition())
+                    .thenCompose(entry -> 
findSchemaEntryByHash(entry.getIndexList(), hash));
+        }
 
     }
 
@@ -319,6 +429,10 @@ private String getSchemaPath(String schemaId) {
     private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
         SchemaStorageFormat.PositionInfo position
     ) {
+        if (log.isDebugEnabled()) {
+            log.debug("Reading schema entry from {}", position);
+        }
+
         return openLedger(position.getLedgerId())
             .thenCompose((ledger) ->
                 Functions.getLedgerEntry(ledger, position.getEntryId())
@@ -342,6 +456,24 @@ private String getSchemaPath(String schemaId) {
         return future;
     }
 
+    @NotNull
+    private CompletableFuture<LocatorEntry> createSchemaLocator(String id, 
SchemaStorageFormat.SchemaLocator locator) {
+        CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
+
+        ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, 
locator.toByteArray(), Acl,
+                CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
+                    Code code = Code.get(rc);
+                    if (code != Code.OK) {
+                        
future.completeExceptionally(KeeperException.create(code));
+                    } else {
+                        // Newly created z-node will have version 0
+                        future.complete(new LocatorEntry(locator, 0));
+                    }
+                }, null);
+
+        return future;
+    }
+
     @NotNull
     private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String 
schema) {
         return localZkCache.getEntryAsync(schema, new 
SchemaLocatorDeserializer()).thenApply(optional ->
@@ -349,58 +481,13 @@ private String getSchemaPath(String schemaId) {
         );
     }
 
-    @NotNull
-    private CompletableFuture<LocatorEntry> getOrCreateSchemaLocator(String 
schema) {
-        // Protect from concurrent schema locator creation
-        return locatorEntries.computeIfAbsent(schema, key -> {
-            CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
-
-            getSchemaLocator(schema).thenCompose(schemaLocatorStatEntry -> {
-                if (schemaLocatorStatEntry.isPresent()) {
-                    return completedFuture(schemaLocatorStatEntry.get());
-                } else {
-                    SchemaStorageFormat.SchemaLocator locator = 
SchemaStorageFormat.SchemaLocator.newBuilder()
-                            
.setInfo(SchemaStorageFormat.IndexEntry.newBuilder().setVersion(-1L)
-                                    
.setHash(ByteString.EMPTY).setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
-                                            .setEntryId(-1L).setLedgerId(-1L)))
-                            .build();
-
-                    CompletableFuture<LocatorEntry> zkFuture = new 
CompletableFuture<>();
-
-                    ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, schema, 
locator.toByteArray(), Acl,
-                            CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
-                                Code code = Code.get(rc);
-                                if (code != Code.OK) {
-                                    
zkFuture.completeExceptionally(KeeperException.create(code));
-                                } else {
-                                    zkFuture.complete(new 
LocatorEntry(locator, -1));
-                                }
-                            }, null);
-
-                    return zkFuture;
-                }
-            }).handleAsync((res, ex) -> {
-                // Cleanup the pending ops from the map
-                locatorEntries.remove(schema, future);
-                if (ex != null) {
-                    future.completeExceptionally(ex);
-                } else {
-                    future.complete(res);
-                }
-                return null;
-            });
-
-            return future;
-        });
-    }
-
     @NotNull
     private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaStorageFormat.SchemaEntry entry) {
         final CompletableFuture<Long> future = new CompletableFuture<>();
         ledgerHandle.asyncAddEntry(entry.toByteArray(),
             (rc, handle, entryId, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to add 
entry", rc, ledgerHandle.getId(), -1));
                 } else {
                     future.complete(entryId);
                 }
@@ -420,7 +507,7 @@ private String getSchemaPath(String schemaId) {
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to create 
ledger", rc, -1, -1));
                 } else {
                     future.complete(handle);
                 }
@@ -438,7 +525,7 @@ private String getSchemaPath(String schemaId) {
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(BKException.create(rc));
+                    future.completeExceptionally(bkException("Failed to open 
ledger", rc, ledgerId, -1));
                 } else {
                     future.complete(handle);
                 }
@@ -452,7 +539,7 @@ private String getSchemaPath(String schemaId) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         ledgerHandle.asyncClose((rc, handle, ctx) -> {
             if (rc != BKException.Code.OK) {
-                future.completeExceptionally(BKException.create(rc));
+                future.completeExceptionally(bkException("Failed to close 
ledger", rc, ledgerHandle.getId(), -1));
             } else {
                 future.complete(null);
             }
@@ -466,7 +553,7 @@ private String getSchemaPath(String schemaId) {
             ledger.asyncReadEntries(entry, entry,
                 (rc, handle, entries, ctx) -> {
                     if (rc != BKException.Code.OK) {
-                        future.completeExceptionally(BKException.create(rc));
+                        future.completeExceptionally(bkException("Failed to 
read entry", rc, ledger.getId(), entry));
                     } else {
                         future.complete(entries.nextElement());
                     }
@@ -519,4 +606,13 @@ private String getSchemaPath(String schemaId) {
             this.zkZnodeVersion = zkZnodeVersion;
         }
     }
+
+    public static Exception bkException(String operation, int rc, long 
ledgerId, long entryId) {
+        String message = 
org.apache.bookkeeper.client.api.BKException.getMessage(rc) + " -  ledger=" + 
ledgerId;
+
+        if (entryId != -1) {
+            message += " - entry=" + entryId;
+        }
+        return new IOException(message);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
index e7723d7754..6e9b1220a5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/PartitionedTopicsSchemaTest.java
@@ -38,10 +38,8 @@
 
     /**
      * Test that sequence id from a producer is correct when there are send 
errors
-     *
-     * the test is disabled {@link 
https://github.com/apache/pulsar/issues/2651}
      */
-    @Test(enabled = false)
+    @Test
     public void partitionedTopicWithSchema() throws Exception {
         admin.namespaces().createNamespace("prop/my-test", 
Collections.singleton("usc"));
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index a6f40fe1c3..3b87e4e086 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -126,15 +126,15 @@ private void start() {
                 if (completed.incrementAndGet() == 
topicMetadata.numPartitions()) {
                     if (createFail.get() == null) {
                         setState(State.Ready);
-                        
producerCreatedFuture().complete(PartitionedProducerImpl.this);
                         log.info("[{}] Created partitioned producer", topic);
+                        
producerCreatedFuture().complete(PartitionedProducerImpl.this);
                     } else {
+                        log.error("[{}] Could not create partitioned 
producer.", topic, createFail.get().getCause());
                         closeAsync().handle((ok, closeException) -> {
                             
producerCreatedFuture().completeExceptionally(createFail.get());
                             client.cleanupProducer(this);
                             return null;
                         });
-                        log.error("[{}] Could not create partitioned 
producer.", topic, createFail.get().getCause());
                     }
                 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to