This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ec2e16  Add deleted schema judgment when adding schema  (#4731)
6ec2e16 is described below

commit 6ec2e16b6bd68ad51e2841277164a53fcfb1260b
Author: congbo <[email protected]>
AuthorDate: Sat Jul 20 00:52:07 2019 +0800

    Add deleted schema judgment when adding schema  (#4731)
    
    ### Motivation
    to fix #4724
    
    ### Verifying this change
    Add the tests for it
---
 .../service/schema/BookkeeperSchemaStorage.java    |  24 +++--
 .../service/schema/SchemaRegistryServiceImpl.java  | 101 ++++++++++++++-------
 .../broker/service/schema/SchemaStorage.java       |   2 +-
 .../broker/service/schema/SchemaServiceTest.java   |  17 ++++
 4 files changed, 102 insertions(+), 42 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index c5d67bd..78d3e8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -25,6 +25,7 @@ import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.NO_DELETED_VERSION;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -107,8 +108,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @Override
-    public CompletableFuture<SchemaVersion> put(String key, byte[] value, 
byte[] hash) {
-        return putSchemaIfAbsent(key, value, 
hash).thenApply(LongSchemaVersion::new);
+    public CompletableFuture<SchemaVersion> put(String key, byte[] value, 
byte[] hash, long maxDeletedVersion) {
+        return putSchemaIfAbsent(key, value, hash, 
maxDeletedVersion).thenApply(LongSchemaVersion::new);
     }
 
     @Override
@@ -139,7 +140,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 .thenApply(entry -> new StoredSchema
                     (
                         entry.getSchemaData().toByteArray(),
-                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion())
+                        new LongSchemaVersion(indexEntry.getVersion())
                     )
                 )
             ));
@@ -279,7 +280,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @NotNull
-    private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] 
data, byte[] hash) {
+    private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[] 
data, byte[] hash, long maxDeletedVersion) {
         return 
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
 
             if (optLocatorEntry.isPresent()) {
@@ -294,7 +295,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                     log.debug("[{}] findSchemaEntryByHash - hash={}", 
schemaId, hash);
                 }
 
-                return findSchemaEntryByHash(locator.getIndexList(), 
hash).thenCompose(version -> {
+                return findSchemaEntryByHash(locator.getIndexList(), hash, 
maxDeletedVersion).thenCompose(version -> {
                     if (isNull(version)) {
                         return addNewSchemaEntryToStore(schemaId, 
locator.getIndexList(), data).thenCompose(
                                 position -> updateSchemaLocator(schemaId, 
optLocatorEntry.get(), position, hash));
@@ -312,7 +313,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                                 // There was a race condition on the schema 
creation. Since it has now been created,
                                 // retry the whole operation so that we have a 
chance to recover without bubbling error
                                 // back to producer/consumer
-                                putSchemaIfAbsent(schemaId, data, hash)
+                                putSchemaIfAbsent(schemaId, data, hash, 
NO_DELETED_VERSION)
                                         .thenAccept(version -> 
future.complete(version))
                                         .exceptionally(ex2 -> {
                                             future.completeExceptionally(ex2);
@@ -441,7 +442,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     @NotNull
     private CompletableFuture<Long> findSchemaEntryByHash(
         List<SchemaStorageFormat.IndexEntry> index,
-        byte[] hash
+        byte[] hash,
+        long maxDeletedVersion
     ) {
 
         if (index.isEmpty()) {
@@ -450,7 +452,11 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 
         for (SchemaStorageFormat.IndexEntry entry : index) {
             if (Arrays.equals(entry.getHash().toByteArray(), hash)) {
-                return completedFuture(entry.getVersion());
+                if (entry.getVersion() > maxDeletedVersion) {
+                    return completedFuture(entry.getVersion());
+                } else {
+                    return completedFuture(null);
+                }
             }
         }
 
@@ -458,7 +464,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             return completedFuture(null);
         } else {
             return readSchemaEntry(index.get(0).getPosition())
-                    .thenCompose(entry -> 
findSchemaEntryByHash(entry.getIndexList(), hash));
+                    .thenCompose(entry -> 
findSchemaEntryByHash(entry.getIndexList(), hash, maxDeletedVersion));
         }
 
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 4014365..e2975a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.broker.service.schema;
 
 import static java.util.Objects.isNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
+import static 
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
+import static 
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
 import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
 import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
@@ -43,6 +46,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -52,6 +56,7 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     private final Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks;
     private final SchemaStorage schemaStorage;
     private final Clock clock;
+    protected static final long NO_DELETED_VERSION = -1L;
 
     @VisibleForTesting
     SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType, 
SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
@@ -106,19 +111,33 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, 
SchemaData schema,
                                                               
SchemaCompatibilityStrategy strategy) {
-        return getSchema(schemaId)
+        return getSchema(schemaId, SchemaVersion.Latest)
             .thenCompose(
                 (existingSchema) ->
                 {
-                    if (existingSchema == null || 
existingSchema.schema.isDeleted()) {
-                        return completedFuture(true);
+                    CompletableFuture<KeyValue<Long, Boolean>> keyValue;
+                    if (existingSchema == null) {
+                        keyValue = completedFuture(new 
KeyValue<>(NO_DELETED_VERSION, true));
+                    } else if (existingSchema.schema.isDeleted()) {
+                        keyValue = completedFuture(new 
KeyValue<>(((LongSchemaVersion)schemaStorage
+                                
.versionFromBytes(existingSchema.version.bytes())).getVersion(), true));
                     } else {
-                        return isCompatible(schemaId, schema, strategy);
+                        if (isTransitiveStrategy(strategy)) {
+                            keyValue = checkCompatibilityWithAll(schemaId, 
schema, strategy);
+
+                        } else {
+                            keyValue = 
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+                                    
completedFuture(((LongSchemaVersion)schemaStorage
+                                    
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 
1L))
+                                    .thenCompose(maxDeleteVersion -> 
isCompatible(schemaId, schema, strategy)
+                                            .thenCompose(isCompatible -> 
completedFuture(new KeyValue<>(maxDeleteVersion, isCompatible))));
+                        }
                     }
+                    return keyValue;
                 }
             )
-            .thenCompose(isCompatible -> {
-                    if (isCompatible) {
+            .thenCompose(keyValue -> {
+                    if (keyValue.getValue()) {
                         byte[] context = 
hashFunction.hashBytes(schema.getData()).asBytes();
                         SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
                             
.setType(Functions.convertFromDomainType(schema.getType()))
@@ -129,7 +148,9 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                             .setTimestamp(clock.millis())
                             .addAllProps(toPairs(schema.getProps()))
                             .build();
-                        return schemaStorage.put(schemaId, info.toByteArray(), 
context);
+                        
+                        return schemaStorage.put(schemaId, info.toByteArray(), 
context, keyValue.getKey());
+
                     } else {
                         return FutureUtil.failedFuture(new 
IncompatibleSchemaException());
                     }
@@ -140,7 +161,19 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     @NotNull
     public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
         byte[] deletedEntry = deleted(schemaId, user).toByteArray();
-        return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
+        return 
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+                schemaStorage.put(schemaId, deletedEntry, new byte[]{}, 
((LongSchemaVersion)schemaStorage
+                
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() - 
1L));
+
+    }
+
+    private static boolean isTransitiveStrategy(SchemaCompatibilityStrategy 
strategy) {
+        if (FORWARD_TRANSITIVE.equals(strategy)
+                || BACKWARD_TRANSITIVE.equals(strategy)
+                || FULL_TRANSITIVE.equals(strategy)) {
+            return true;
+        }
+        return false;
     }
 
     @Override
@@ -150,7 +183,8 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
             case FORWARD_TRANSITIVE:
             case BACKWARD_TRANSITIVE:
             case FULL_TRANSITIVE:
-                return checkCompatibilityWithAll(schemaId, schema, strategy);
+                return checkCompatibilityWithAll(schemaId, schema, strategy)
+                        .thenCompose(keyValue -> 
completedFuture(keyValue.getValue()));
             default:
                 return checkCompatibilityWithLatest(schemaId, schema, 
strategy);
         }
@@ -195,30 +229,33 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                             && isCompatible(existingSchema, schema, strategy));
     }
 
-    private CompletableFuture<Boolean> checkCompatibilityWithAll(String 
schemaId, SchemaData schema,
-                                                                 
SchemaCompatibilityStrategy strategy) {
-        return getAllSchemas(schemaId)
-            .thenCompose(FutureUtils::collect)
-            .thenApply(list -> {
-                    // Trim the prefix of schemas before the latest delete.
-                    int lastIndex = list.size() - 1;
-                    for (int i = lastIndex; i >= 0; i--) {
-                        if (list.get(i).schema.isDeleted()) {
-                            if (i == lastIndex) { // if the latest schema is a 
delete, there's no schemas to compare
-                                return 
Collections.<SchemaAndMetadata>emptyList();
-                            } else {
-                                return list.subList(i + 1, list.size());
-                            }
-                        }
+    private CompletableFuture<KeyValue<Long, Boolean>> 
checkCompatibilityWithAll(String schemaId, SchemaData schema,
+                                                                               
  SchemaCompatibilityStrategy strategy) {
+        return 
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+                 completedFuture(new 
KeyValue<>(((LongSchemaVersion)schemaStorage.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion()
 - 1L,
+                        compatibilityChecks.getOrDefault(schema.getType(), 
SchemaCompatibilityCheck.DEFAULT)
+                        .isCompatible(schemaAndMetadataList
+                                .stream()
+                                .map(schemaAndMetadata -> 
schemaAndMetadata.schema)
+                                .collect(Collectors.toList()), schema, 
strategy)))
+            );
+    }
+
+    private CompletableFuture<List<SchemaAndMetadata>> 
trimDeletedSchemaAndGetList(String schemaId) {
+        return 
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
+            // Trim the prefix of schemas before the latest delete.
+            int lastIndex = list.size() - 1;
+            for (int i = lastIndex; i >= 0; i--) {
+                if (list.get(i).schema.isDeleted()) {
+                    if (i == lastIndex) { // if the latest schema is a delete, 
there's no schemas to compare
+                        return Collections.<SchemaAndMetadata>emptyList();
+                    } else {
+                        return list.subList(i + 1, list.size());
                     }
-                    return list;
-                })
-            .thenApply(schemaAndMetadataList -> schemaAndMetadataList
-                       .stream()
-                       .map(schemaAndMetadata -> schemaAndMetadata.schema)
-                       .collect(Collectors.toList()))
-            .thenApply(schemas -> 
compatibilityChecks.getOrDefault(schema.getType(), 
SchemaCompatibilityCheck.DEFAULT)
-                       .isCompatible(schemas, schema, strategy));
+                }
+            }
+            return list;
+        });
     }
 
     interface Functions {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index f133666..e59fa33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 
 public interface SchemaStorage {
 
-    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] 
hash);
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] 
hash, long maxDeletedVersion);
 
     CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index ffe7532..d998fdd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -111,6 +111,23 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void deleteSchemaAndAddSchema() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        SchemaData latest = getLatestSchema(schemaId1, version(0));
+        assertEquals(schema1, latest);
+
+        deleteSchema(schemaId1, version(1));
+
+        assertNull(schemaRegistryService.getSchema(schemaId1).get());
+
+        putSchema(schemaId1, schema1, version(2));
+
+        latest = getLatestSchema(schemaId1, version(2));
+        assertEquals(schema1, latest);
+
+    }
+
+    @Test
     public void getReturnsTheLastWrittenEntry() throws Exception {
         putSchema(schemaId1, schema1, version(0));
         putSchema(schemaId1, schema2, version(1));

Reply via email to