This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9eca22fbb1551a057007d808f61961f4cced16b6 Author: Rajan Dhabalia <[email protected]> AuthorDate: Sat Feb 6 20:58:32 2021 -0800 [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212) add more error log fix list assignment (cherry picked from commit 3d5d6f6a1681cf842a2e3334d412fa449c553fd0) --- .../service/schema/BookkeeperSchemaStorage.java | 53 +++++++++++++--- .../schema/DefaultSchemaRegistryService.java | 5 ++ .../broker/service/schema/SchemaRegistry.java | 2 + .../service/schema/SchemaRegistryServiceImpl.java | 73 +++++++++++++++++----- .../service/schema/exceptions/SchemaException.java | 10 +++ ...hemaRegistryServiceWithSchemaDataValidator.java | 7 ++- .../broker/service/schema/ClientGetSchemaTest.java | 67 ++++++++++++++++++++ .../java/org/apache/pulsar/schema/SchemaTest.java | 16 +++-- .../common/protocol/schema/SchemaStorage.java | 2 + 9 files changed, 207 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 43ac9bf..db948c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -39,8 +39,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.client.BKException; @@ -51,6 +52,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.LongSchemaVersion; @@ -132,7 +134,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { @Override public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) { CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>(); - getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> { + getLocator(key).thenAccept(locator -> { if (log.isDebugEnabled()) { log.debug("[{}] Get all schemas - locator: {}", key, locator); } @@ -157,9 +159,42 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return result; } + private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) { + return getSchemaLocator(getSchemaPath(key)); + } + + public void clearLocatorCache(String key) { + localZkCache.invalidate(getSchemaPath(key)); + } + + @VisibleForTesting + List<Long> getSchemaLedgerList(String key) throws IOException { + Optional<LocatorEntry> locatorEntry = null; + try { + locatorEntry = getLocator(key).get(); + } catch (Exception e) { + log.warn("Failed to get list of schema-storage ledger for {}", key, + (e instanceof ExecutionException ? e.getCause() : e)); + throw new IOException("Failed to get schema ledger for" + key); + } + LocatorEntry entry = locatorEntry.orElse(null); + return entry != null ? entry.locator.getIndexList().stream().map(i -> i.getPosition().getLedgerId()) + .collect(Collectors.toList()) : null; + } + + @VisibleForTesting + BookKeeper getBookKeeper() { + return bookKeeper; + } + + @Override + public CompletableFuture<SchemaVersion> delete(String key, boolean forcefully) { + return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new); + } + @Override public CompletableFuture<SchemaVersion> delete(String key) { - return deleteSchema(key).thenApply(LongSchemaVersion::new); + return delete(key, false); } @NotNull @@ -350,9 +385,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } @NotNull - private CompletableFuture<Long> deleteSchema(String schemaId) { - return getSchema(schemaId).thenCompose(schemaAndVersion -> { - if (isNull(schemaAndVersion)) { + private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully) { + return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId)) + .thenCompose(schemaAndVersion -> { + if (!forceFully && isNull(schemaAndVersion)) { return completedFuture(null); } else { // The version is only for the compatibility of the current interface @@ -373,6 +409,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } catch (InterruptedException | KeeperException e) { future.completeExceptionally(e); } + clearLocatorCache(getSchemaPath(schemaId)); future.complete(version); } }, null); @@ -656,6 +693,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage { if (entryId != -1) { message += " - entry=" + entryId; } - return new IOException(message); + boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException + && rc != BKException.Code.NoSuchEntryException; + return new SchemaException(recoverable, message); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 807fb2de..7069e24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -83,6 +83,11 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService { } @Override + public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) { + return completedFuture(null); + } + + @Override public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { return completedFuture(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index 3f44ed0..4108298 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -43,6 +43,8 @@ public interface SchemaRegistry extends AutoCloseable { CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId); + CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully); + CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index c06775b..d40c524 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -20,11 +20,11 @@ package org.apache.pulsar.broker.service.schema; import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE; import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE; import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FULL_TRANSITIVE; import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap; -import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.HashFunction; @@ -42,10 +42,11 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.validation.constraints.NotNull; - +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; @@ -57,6 +58,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; +@Slf4j public class SchemaRegistryServiceImpl implements SchemaRegistryService { private static HashFunction hashFunction = Hashing.sha256(); private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks; @@ -177,7 +179,12 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { @Override public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId) { - return schemaStorage.delete(schemaId); + return deleteSchemaStorage(schemaId, false); + } + + @Override + public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) { + return schemaStorage.delete(schemaId, forcefully); } @Override @@ -342,20 +349,58 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { } public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) { - return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> { - // Trim the prefix of schemas before the latest delete. - int lastIndex = list.size() - 1; - for (int i = lastIndex; i >= 0; i--) { - if (list.get(i).schema.isDeleted()) { - if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare - return Collections.emptyList(); - } else { - return list.subList(i + 1, list.size()); - } + + CompletableFuture<List<SchemaAndMetadata>> schemaResult = new CompletableFuture<>(); + CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId); + schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> { + List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList; + if (ex != null) { + boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException) + ? ((SchemaException) ex.getCause()).isRecoverable() + : true; + // if error is recoverable then fail the request. + if (recoverable) { + schemaResult.completeExceptionally(ex.getCause()); + return null; } + // clean the schema list for recoverable and delete the schema from zk + schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> { + if (!schemaFuture.isCompletedExceptionally()) { + list.add(schemaFuture.getNow(null)); + return; + } + }); + trimDeletedSchemaAndGetList(list); + // clean up the broken schema from zk + deleteSchemaStorage(schemaId, true).handle((sv, th) -> { + log.info("Clean up non-recoverable schema {}. Deletion of schema {} {}", ex.getCause().getMessage(), + schemaId, (th == null ? "successful" : "failed, " + th.getCause().getMessage())); + schemaResult.complete(list); + return null; + }); + return null; } - return list; + // trim the deleted schema and return the result if schema is retrieved successfully + List<SchemaAndMetadata> trimmed = trimDeletedSchemaAndGetList(list); + schemaResult.complete(trimmed); + return null; }); + return schemaResult; + } + + private List<SchemaAndMetadata> trimDeletedSchemaAndGetList(List<SchemaAndMetadata> list) { + // Trim the prefix of schemas before the latest delete. + int lastIndex = list.size() - 1; + for (int i = lastIndex; i >= 0; i--) { + if (list.get(i).schema.isDeleted()) { + if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare + return Collections.emptyList(); + } else { + return list.subList(i + 1, list.size()); + } + } + } + return list; } interface Functions { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java index 8d85b39..e9267d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java @@ -26,6 +26,12 @@ import org.apache.pulsar.broker.service.BrokerServiceException; public class SchemaException extends BrokerServiceException { private static final long serialVersionUID = -6587520779026691815L; + private boolean recoverable; + + public SchemaException(boolean recoverable, String message) { + super(message); + this.recoverable = recoverable; + } public SchemaException(String message) { super(message); @@ -38,4 +44,8 @@ public class SchemaException extends BrokerServiceException { public SchemaException(String message, Throwable cause) { super(message, cause); } + + public boolean isRecoverable() { + return recoverable; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java index 570b5bf..a317942 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -103,7 +103,12 @@ public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegis @Override public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId) { - return service.deleteSchemaStorage(schemaId); + return deleteSchemaStorage(schemaId, false); + } + + @Override + public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully) { + return service.deleteSchemaStorage(schemaId, forcefully); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java index dc35286..8ce8552 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service.schema; +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertEquals; import java.util.ArrayList; @@ -26,6 +29,7 @@ import java.util.Optional; import lombok.Cleanup; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -33,11 +37,16 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.schema.Schemas; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.Sets; + public class ClientGetSchemaTest extends ProducerConsumerBase { private static final String topicBytes = "my-property/my-ns/topic-bytes"; @@ -104,4 +113,62 @@ public class ClientGetSchemaTest extends ProducerConsumerBase { assertEquals(client.getSchema(topicAvro).join(), Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo())); } + /** + * It validates if schema ledger is deleted or non recoverable then it will clean up schema storage for the topic + * and make the topic available. + * + * @throws Exception + */ + @Test + public void testSchemaFailure() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-broken-schema-storage"; + final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString(); + + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test")); + + // (1) create topic with schema + Producer<Schemas.PersonTwo> producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne).create(); + + producer.close(); + + String key = TopicName.get(fqtnOne).getSchemaName(); + BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage(); + long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0); + + // (2) break schema locator by deleting schema-ledger + schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId); + + admin.topics().unload(fqtnOne); + + // (3) create topic again: broker should handle broken schema and load the topic successfully + producer = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne).create(); + + assertNotEquals(schemaLedgerId, schemaStrogate.getSchemaLedgerList(key).get(0).longValue()); + + Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); + personTwo.setId(1); + personTwo.setName("Tom"); + + Consumer<Schemas.PersonTwo> consumer = pulsarClient + .newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> builder().withAlwaysAllowNull(false) + .withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test").topic(fqtnOne).subscribe(); + + producer.send(personTwo); + + Schemas.PersonTwo personConsume = consumer.receive().getValue(); + assertEquals("Tom", personConsume.getName()); + assertEquals(1, personConsume.getId()); + + producer.close(); + consumer.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 2da8401..b8580fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -18,9 +18,15 @@ */ package org.apache.pulsar.schema; -import com.google.common.collect.Sets; -import lombok.extern.slf4j.Slf4j; +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.util.Collections; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -35,11 +41,9 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.Collections; +import com.google.common.collect.Sets; -import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; -import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; -import static org.junit.Assert.assertEquals; +import lombok.extern.slf4j.Slf4j; @Slf4j public class SchemaTest extends MockedPulsarServiceBaseTest { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index 9f007aa..c34154b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -32,6 +32,8 @@ public interface SchemaStorage { CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key); + CompletableFuture<SchemaVersion> delete(String key, boolean forcefully); + CompletableFuture<SchemaVersion> delete(String key); SchemaVersion versionFromBytes(byte[] version);
