This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec2e16 Add deleted schema judgment when adding schema (#4731)
6ec2e16 is described below
commit 6ec2e16b6bd68ad51e2841277164a53fcfb1260b
Author: congbo <[email protected]>
AuthorDate: Sat Jul 20 00:52:07 2019 +0800
Add deleted schema judgment when adding schema (#4731)
### Motivation
to fix #4724
### Verifying this change
Add the tests for it
---
.../service/schema/BookkeeperSchemaStorage.java | 24 +++--
.../service/schema/SchemaRegistryServiceImpl.java | 101 ++++++++++++++-------
.../broker/service/schema/SchemaStorage.java | 2 +-
.../broker/service/schema/SchemaServiceTest.java | 17 ++++
4 files changed, 102 insertions(+), 42 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index c5d67bd..78d3e8a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -25,6 +25,7 @@ import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.NO_DELETED_VERSION;
import com.google.common.annotations.VisibleForTesting;
@@ -107,8 +108,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@Override
- public CompletableFuture<SchemaVersion> put(String key, byte[] value,
byte[] hash) {
- return putSchemaIfAbsent(key, value,
hash).thenApply(LongSchemaVersion::new);
+ public CompletableFuture<SchemaVersion> put(String key, byte[] value,
byte[] hash, long maxDeletedVersion) {
+ return putSchemaIfAbsent(key, value, hash,
maxDeletedVersion).thenApply(LongSchemaVersion::new);
}
@Override
@@ -139,7 +140,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
- new
LongSchemaVersion(schemaLocator.getInfo().getVersion())
+ new LongSchemaVersion(indexEntry.getVersion())
)
)
));
@@ -279,7 +280,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NotNull
- private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[]
data, byte[] hash) {
+ private CompletableFuture<Long> putSchemaIfAbsent(String schemaId, byte[]
data, byte[] hash, long maxDeletedVersion) {
return
getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
if (optLocatorEntry.isPresent()) {
@@ -294,7 +295,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
log.debug("[{}] findSchemaEntryByHash - hash={}",
schemaId, hash);
}
- return findSchemaEntryByHash(locator.getIndexList(),
hash).thenCompose(version -> {
+ return findSchemaEntryByHash(locator.getIndexList(), hash,
maxDeletedVersion).thenCompose(version -> {
if (isNull(version)) {
return addNewSchemaEntryToStore(schemaId,
locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash));
@@ -312,7 +313,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
// There was a race condition on the schema
creation. Since it has now been created,
// retry the whole operation so that we have a
chance to recover without bubbling error
// back to producer/consumer
- putSchemaIfAbsent(schemaId, data, hash)
+ putSchemaIfAbsent(schemaId, data, hash,
NO_DELETED_VERSION)
.thenAccept(version ->
future.complete(version))
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
@@ -441,7 +442,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
@NotNull
private CompletableFuture<Long> findSchemaEntryByHash(
List<SchemaStorageFormat.IndexEntry> index,
- byte[] hash
+ byte[] hash,
+ long maxDeletedVersion
) {
if (index.isEmpty()) {
@@ -450,7 +452,11 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
for (SchemaStorageFormat.IndexEntry entry : index) {
if (Arrays.equals(entry.getHash().toByteArray(), hash)) {
- return completedFuture(entry.getVersion());
+ if (entry.getVersion() > maxDeletedVersion) {
+ return completedFuture(entry.getVersion());
+ } else {
+ return completedFuture(null);
+ }
}
}
@@ -458,7 +464,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return completedFuture(null);
} else {
return readSchemaEntry(index.get(0).getPosition())
- .thenCompose(entry ->
findSchemaEntryByHash(entry.getIndexList(), hash));
+ .thenCompose(entry ->
findSchemaEntryByHash(entry.getIndexList(), hash, maxDeletedVersion));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 4014365..e2975a1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.broker.service.schema;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
+import static
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
+import static
org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
@@ -43,6 +46,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
@@ -52,6 +56,7 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
private final Map<SchemaType, SchemaCompatibilityCheck>
compatibilityChecks;
private final SchemaStorage schemaStorage;
private final Clock clock;
+ protected static final long NO_DELETED_VERSION = -1L;
@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Map<SchemaType,
SchemaCompatibilityCheck> compatibilityChecks, Clock clock) {
@@ -106,19 +111,33 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema,
SchemaCompatibilityStrategy strategy) {
- return getSchema(schemaId)
+ return getSchema(schemaId, SchemaVersion.Latest)
.thenCompose(
(existingSchema) ->
{
- if (existingSchema == null ||
existingSchema.schema.isDeleted()) {
- return completedFuture(true);
+ CompletableFuture<KeyValue<Long, Boolean>> keyValue;
+ if (existingSchema == null) {
+ keyValue = completedFuture(new
KeyValue<>(NO_DELETED_VERSION, true));
+ } else if (existingSchema.schema.isDeleted()) {
+ keyValue = completedFuture(new
KeyValue<>(((LongSchemaVersion)schemaStorage
+
.versionFromBytes(existingSchema.version.bytes())).getVersion(), true));
} else {
- return isCompatible(schemaId, schema, strategy);
+ if (isTransitiveStrategy(strategy)) {
+ keyValue = checkCompatibilityWithAll(schemaId,
schema, strategy);
+
+ } else {
+ keyValue =
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+
completedFuture(((LongSchemaVersion)schemaStorage
+
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() -
1L))
+ .thenCompose(maxDeleteVersion ->
isCompatible(schemaId, schema, strategy)
+ .thenCompose(isCompatible ->
completedFuture(new KeyValue<>(maxDeleteVersion, isCompatible))));
+ }
}
+ return keyValue;
}
)
- .thenCompose(isCompatible -> {
- if (isCompatible) {
+ .thenCompose(keyValue -> {
+ if (keyValue.getValue()) {
byte[] context =
hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info =
SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
@@ -129,7 +148,9 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
- return schemaStorage.put(schemaId, info.toByteArray(),
context);
+
+ return schemaStorage.put(schemaId, info.toByteArray(),
context, keyValue.getKey());
+
} else {
return FutureUtil.failedFuture(new
IncompatibleSchemaException());
}
@@ -140,7 +161,19 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
@NotNull
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId,
String user) {
byte[] deletedEntry = deleted(schemaId, user).toByteArray();
- return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
+ return
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+ schemaStorage.put(schemaId, deletedEntry, new byte[]{},
((LongSchemaVersion)schemaStorage
+
.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion() -
1L));
+
+ }
+
+ private static boolean isTransitiveStrategy(SchemaCompatibilityStrategy
strategy) {
+ if (FORWARD_TRANSITIVE.equals(strategy)
+ || BACKWARD_TRANSITIVE.equals(strategy)
+ || FULL_TRANSITIVE.equals(strategy)) {
+ return true;
+ }
+ return false;
}
@Override
@@ -150,7 +183,8 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
case FORWARD_TRANSITIVE:
case BACKWARD_TRANSITIVE:
case FULL_TRANSITIVE:
- return checkCompatibilityWithAll(schemaId, schema, strategy);
+ return checkCompatibilityWithAll(schemaId, schema, strategy)
+ .thenCompose(keyValue ->
completedFuture(keyValue.getValue()));
default:
return checkCompatibilityWithLatest(schemaId, schema,
strategy);
}
@@ -195,30 +229,33 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
&& isCompatible(existingSchema, schema, strategy));
}
- private CompletableFuture<Boolean> checkCompatibilityWithAll(String
schemaId, SchemaData schema,
-
SchemaCompatibilityStrategy strategy) {
- return getAllSchemas(schemaId)
- .thenCompose(FutureUtils::collect)
- .thenApply(list -> {
- // Trim the prefix of schemas before the latest delete.
- int lastIndex = list.size() - 1;
- for (int i = lastIndex; i >= 0; i--) {
- if (list.get(i).schema.isDeleted()) {
- if (i == lastIndex) { // if the latest schema is a
delete, there's no schemas to compare
- return
Collections.<SchemaAndMetadata>emptyList();
- } else {
- return list.subList(i + 1, list.size());
- }
- }
+ private CompletableFuture<KeyValue<Long, Boolean>>
checkCompatibilityWithAll(String schemaId, SchemaData schema,
+
SchemaCompatibilityStrategy strategy) {
+ return
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
+ completedFuture(new
KeyValue<>(((LongSchemaVersion)schemaStorage.versionFromBytes(schemaAndMetadataList.get(0).version.bytes())).getVersion()
- 1L,
+ compatibilityChecks.getOrDefault(schema.getType(),
SchemaCompatibilityCheck.DEFAULT)
+ .isCompatible(schemaAndMetadataList
+ .stream()
+ .map(schemaAndMetadata ->
schemaAndMetadata.schema)
+ .collect(Collectors.toList()), schema,
strategy)))
+ );
+ }
+
+ private CompletableFuture<List<SchemaAndMetadata>>
trimDeletedSchemaAndGetList(String schemaId) {
+ return
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
+ // Trim the prefix of schemas before the latest delete.
+ int lastIndex = list.size() - 1;
+ for (int i = lastIndex; i >= 0; i--) {
+ if (list.get(i).schema.isDeleted()) {
+ if (i == lastIndex) { // if the latest schema is a delete,
there's no schemas to compare
+ return Collections.<SchemaAndMetadata>emptyList();
+ } else {
+ return list.subList(i + 1, list.size());
}
- return list;
- })
- .thenApply(schemaAndMetadataList -> schemaAndMetadataList
- .stream()
- .map(schemaAndMetadata -> schemaAndMetadata.schema)
- .collect(Collectors.toList()))
- .thenApply(schemas ->
compatibilityChecks.getOrDefault(schema.getType(),
SchemaCompatibilityCheck.DEFAULT)
- .isCompatible(schemas, schema, strategy));
+ }
+ }
+ return list;
+ });
}
interface Functions {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index f133666..e59fa33 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion;
public interface SchemaStorage {
- CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[]
hash);
+ CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[]
hash, long maxDeletedVersion);
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index ffe7532..d998fdd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -111,6 +111,23 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void deleteSchemaAndAddSchema() throws Exception {
+ putSchema(schemaId1, schema1, version(0));
+ SchemaData latest = getLatestSchema(schemaId1, version(0));
+ assertEquals(schema1, latest);
+
+ deleteSchema(schemaId1, version(1));
+
+ assertNull(schemaRegistryService.getSchema(schemaId1).get());
+
+ putSchema(schemaId1, schema1, version(2));
+
+ latest = getLatestSchema(schemaId1, version(2));
+ assertEquals(schema1, latest);
+
+ }
+
+ @Test
public void getReturnsTheLastWrittenEntry() throws Exception {
putSchema(schemaId1, schema1, version(0));
putSchema(schemaId1, schema2, version(1));