This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d5d6f6 [pulsar-broker] Fix: handle topic loading failure due to
broken schema ledger (#9212)
3d5d6f6 is described below
commit 3d5d6f6a1681cf842a2e3334d412fa449c553fd0
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sat Feb 6 20:58:32 2021 -0800
[pulsar-broker] Fix: handle topic loading failure due to broken schema
ledger (#9212)
add more error log
fix list assignment
---
.../service/schema/BookkeeperSchemaStorage.java | 52 ++++++++++++++--
.../schema/DefaultSchemaRegistryService.java | 5 ++
.../broker/service/schema/SchemaRegistry.java | 2 +
.../service/schema/SchemaRegistryServiceImpl.java | 71 ++++++++++++++++++----
.../service/schema/exceptions/SchemaException.java | 10 +++
...hemaRegistryServiceWithSchemaDataValidator.java | 7 ++-
.../broker/service/schema/ClientGetSchemaTest.java | 67 ++++++++++++++++++++
.../java/org/apache/pulsar/schema/SchemaTest.java | 16 +++--
.../common/protocol/schema/SchemaStorage.java | 2 +
9 files changed, 206 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 475e6f9..3c29da8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -37,7 +37,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
@@ -129,7 +132,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>>
getAll(String key) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new
CompletableFuture<>();
- getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> {
+ getLocator(key).thenAccept(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}
@@ -154,9 +157,42 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return result;
}
+ private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
+ return getSchemaLocator(getSchemaPath(key));
+ }
+
+ public void clearLocatorCache(String key) {
+ localZkCache.invalidate(getSchemaPath(key));
+ }
+
+ @VisibleForTesting
+ List<Long> getSchemaLedgerList(String key) throws IOException {
+ Optional<LocatorEntry> locatorEntry = null;
+ try {
+ locatorEntry = getLocator(key).get();
+ } catch (Exception e) {
+ log.warn("Failed to get list of schema-storage ledger for {}", key,
+ (e instanceof ExecutionException ? e.getCause() : e));
+ throw new IOException("Failed to get schema ledger for" + key);
+ }
+ LocatorEntry entry = locatorEntry.orElse(null);
+ return entry != null ? entry.locator.getIndexList().stream().map(i ->
i.getPosition().getLedgerId())
+ .collect(Collectors.toList()) : null;
+ }
+
+ @VisibleForTesting
+ BookKeeper getBookKeeper() {
+ return bookKeeper;
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> delete(String key, boolean
forcefully) {
+ return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
+ }
+
@Override
public CompletableFuture<SchemaVersion> delete(String key) {
- return deleteSchema(key).thenApply(LongSchemaVersion::new);
+ return delete(key, false);
}
@NotNull
@@ -350,9 +386,10 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NotNull
- private CompletableFuture<Long> deleteSchema(String schemaId) {
- return getSchema(schemaId).thenCompose(schemaAndVersion -> {
- if (isNull(schemaAndVersion)) {
+ private CompletableFuture<Long> deleteSchema(String schemaId, boolean
forceFully) {
+ return (forceFully ? CompletableFuture.completedFuture(null) :
getSchema(schemaId))
+ .thenCompose(schemaAndVersion -> {
+ if (!forceFully && isNull(schemaAndVersion)) {
return completedFuture(null);
} else {
// The version is only for the compatibility of the current
interface
@@ -373,6 +410,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
} catch (InterruptedException |
KeeperException e) {
future.completeExceptionally(e);
}
+ clearLocatorCache(getSchemaPath(schemaId));
future.complete(version);
}
}, null);
@@ -657,6 +695,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
if (entryId != -1) {
message += " - entry=" + entryId;
}
- return new IOException(message);
+ boolean recoverable = rc !=
BKException.Code.NoSuchLedgerExistsException
+ && rc != BKException.Code.NoSuchEntryException;
+ return new SchemaException(recoverable, message);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index 56ccb68..8f98947 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -81,6 +81,11 @@ public class DefaultSchemaRegistryService implements
SchemaRegistryService {
}
@Override
+ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String
schemaId, boolean forcefully) {
+ return completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData
schema,
SchemaCompatibilityStrategy
strategy) {
return completedFuture(false);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index b756869..f99e8f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -41,6 +41,8 @@ public interface SchemaRegistry extends AutoCloseable {
CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);
+ CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId,
boolean forcefully);
+
CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy
strategy);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d16f362..364628f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.schema;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
@@ -40,9 +39,11 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections.CollectionUtils;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -54,6 +55,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
+@Slf4j
public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final Map<SchemaType, SchemaCompatibilityCheck>
compatibilityChecks;
@@ -176,7 +178,12 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String
schemaId) {
- return schemaStorage.delete(schemaId);
+ return deleteSchemaStorage(schemaId, false);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String
schemaId, boolean forcefully) {
+ return schemaStorage.delete(schemaId, forcefully);
}
@Override
@@ -345,20 +352,58 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
public CompletableFuture<List<SchemaAndMetadata>>
trimDeletedSchemaAndGetList(String schemaId) {
- return
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
- // Trim the prefix of schemas before the latest delete.
- int lastIndex = list.size() - 1;
- for (int i = lastIndex; i >= 0; i--) {
- if (list.get(i).schema.isDeleted()) {
- if (i == lastIndex) { // if the latest schema is a delete,
there's no schemas to compare
- return Collections.emptyList();
- } else {
- return list.subList(i + 1, list.size());
- }
+
+ CompletableFuture<List<SchemaAndMetadata>> schemaResult = new
CompletableFuture<>();
+ CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>>
schemaFutureList = getAllSchemas(schemaId);
+ schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList,
ex) -> {
+ List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() :
schemaList;
+ if (ex != null) {
+ boolean recoverable = ex.getCause() != null && (ex.getCause()
instanceof SchemaException)
+ ? ((SchemaException) ex.getCause()).isRecoverable()
+ : true;
+ // if error is recoverable then fail the request.
+ if (recoverable) {
+ schemaResult.completeExceptionally(ex.getCause());
+ return null;
}
+ // clean the schema list for recoverable and delete the schema
from zk
+
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+ if (!schemaFuture.isCompletedExceptionally()) {
+ list.add(schemaFuture.getNow(null));
+ return;
+ }
+ });
+ trimDeletedSchemaAndGetList(list);
+ // clean up the broken schema from zk
+ deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+ log.info("Clean up non-recoverable schema {}. Deletion of
schema {} {}", ex.getCause().getMessage(),
+ schemaId, (th == null ? "successful" : "failed, "
+ th.getCause().getMessage()));
+ schemaResult.complete(list);
+ return null;
+ });
+ return null;
}
- return list;
+ // trim the deleted schema and return the result if schema is
retrieved successfully
+ List<SchemaAndMetadata> trimmed =
trimDeletedSchemaAndGetList(list);
+ schemaResult.complete(trimmed);
+ return null;
});
+ return schemaResult;
+ }
+
+ private List<SchemaAndMetadata>
trimDeletedSchemaAndGetList(List<SchemaAndMetadata> list) {
+ // Trim the prefix of schemas before the latest delete.
+ int lastIndex = list.size() - 1;
+ for (int i = lastIndex; i >= 0; i--) {
+ if (list.get(i).schema.isDeleted()) {
+ if (i == lastIndex) { // if the latest schema is a delete,
there's no schemas to compare
+ return Collections.emptyList();
+ } else {
+ return list.subList(i + 1, list.size());
+ }
+ }
+ }
+ return list;
}
interface Functions {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
index 8d85b39..e9267d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
@@ -26,6 +26,12 @@ import
org.apache.pulsar.broker.service.BrokerServiceException;
public class SchemaException extends BrokerServiceException {
private static final long serialVersionUID = -6587520779026691815L;
+ private boolean recoverable;
+
+ public SchemaException(boolean recoverable, String message) {
+ super(message);
+ this.recoverable = recoverable;
+ }
public SchemaException(String message) {
super(message);
@@ -38,4 +44,8 @@ public class SchemaException extends BrokerServiceException {
public SchemaException(String message, Throwable cause) {
super(message, cause);
}
+
+ public boolean isRecoverable() {
+ return recoverable;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index 99a33e0..25704cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -103,7 +103,12 @@ public class SchemaRegistryServiceWithSchemaDataValidator
implements SchemaRegis
@Override
public CompletableFuture<SchemaVersion> deleteSchemaStorage(String
schemaId) {
- return service.deleteSchemaStorage(schemaId);
+ return deleteSchemaStorage(schemaId, false);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String
schemaId, boolean forcefully) {
+ return service.deleteSchemaStorage(schemaId, forcefully);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index dc35286..8ce8552 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.service.schema;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
@@ -26,6 +29,7 @@ import java.util.Optional;
import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
@@ -33,11 +37,16 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.schema.Schemas;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.google.common.collect.Sets;
+
public class ClientGetSchemaTest extends ProducerConsumerBase {
private static final String topicBytes = "my-property/my-ns/topic-bytes";
@@ -104,4 +113,62 @@ public class ClientGetSchemaTest extends
ProducerConsumerBase {
assertEquals(client.getSchema(topicAvro).join(),
Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
}
+ /**
+ * It validates if schema ledger is deleted or non recoverable then it
will clean up schema storage for the topic
+ * and make the topic available.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSchemaFailure() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicOne = "test-broken-schema-storage";
+ final String fqtnOne = TopicName.get(TopicDomain.persistent.value(),
tenant, namespace, topicOne).toString();
+
+ admin.namespaces().createNamespace(tenant + "/" + namespace,
Sets.newHashSet("test"));
+
+ // (1) create topic with schema
+ Producer<Schemas.PersonTwo> producer = pulsarClient
+ .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+ .topic(fqtnOne).create();
+
+ producer.close();
+
+ String key = TopicName.get(fqtnOne).getSchemaName();
+ BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage)
pulsar.getSchemaStorage();
+ long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+
+ // (2) break schema locator by deleting schema-ledger
+ schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+
+ admin.topics().unload(fqtnOne);
+
+ // (3) create topic again: broker should handle broken schema and load
the topic successfully
+ producer = pulsarClient
+ .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+ .topic(fqtnOne).create();
+
+ assertNotEquals(schemaLedgerId,
schemaStrogate.getSchemaLedgerList(key).get(0).longValue());
+
+ Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
+ personTwo.setId(1);
+ personTwo.setName("Tom");
+
+ Consumer<Schemas.PersonTwo> consumer = pulsarClient
+ .newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+ .subscriptionName("test").topic(fqtnOne).subscribe();
+
+ producer.send(personTwo);
+
+ Schemas.PersonTwo personConsume = consumer.receive().getValue();
+ assertEquals("Tom", personConsume.getName());
+ assertEquals(1, personConsume.getId());
+
+ producer.close();
+ consumer.close();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 2da8401..b8580fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -18,9 +18,15 @@
*/
package org.apache.pulsar.schema;
-import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.Collections;
+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -35,11 +41,9 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.Collections;
+import com.google.common.collect.Sets;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
-import static org.junit.Assert.assertEquals;
+import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SchemaTest extends MockedPulsarServiceBaseTest {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
index 9f007aa..c34154b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
@@ -32,6 +32,8 @@ public interface SchemaStorage {
CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String
key);
+ CompletableFuture<SchemaVersion> delete(String key, boolean forcefully);
+
CompletableFuture<SchemaVersion> delete(String key);
SchemaVersion versionFromBytes(byte[] version);