This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 592c3f8040a10826ec5488a6836209cddaddd263 Author: lipenghui <[email protected]> AuthorDate: Tue Apr 27 06:56:36 2021 +0800 Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386) * Fix primary schema upload for ALWAYS_COMPATIBLE strategy. * Fix checkstyle. (cherry picked from commit f8716c258f095f8cb0ec44ea7e6b56cb431224f3) --- .../service/schema/BookkeeperSchemaStorage.java | 7 +------ .../SchemaTypeCompatibilityCheckTest.java | 23 ++++++++++++++++++++-- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 0911ac6..2ecc927 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -294,12 +293,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> { if (optLocatorEntry.isPresent()) { - // Schema locator was already present + SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator; - byte[] storedHash = locator.getInfo().getHash().toByteArray(); - if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) { - return completedFuture(locator.getInfo().getVersion()); - } if (log.isDebugEnabled()) { log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index bc711c5..61cc989 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -20,7 +20,9 @@ package org.apache.pulsar.schema.compatibility; import com.google.common.collect.Sets; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -313,17 +315,34 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes }; for (Schema<?> schema : schemas) { - pulsarClient.newProducer(schema) + Producer<?> p = pulsarClient.newProducer(schema) .topic(topicName) .create(); + p.close(); } for (Schema<?> schema : schemas) { - pulsarClient.newConsumer(schema) + Consumer<?> c = pulsarClient.newConsumer(schema) .topic(topicName) .subscriptionName(UUID.randomUUID().toString()) .subscribe(); + c.close(); } + + List<SchemaInfo> schemasOfTopic = admin.schemas().getAllSchemas(topicName); + + // bytes[] schema and bytebuffer schema does not upload schema info to the schema registry + assertEquals(schemasOfTopic.size(), schemas.length - 2); + + // Try to upload the schema again. + for (Schema<?> schema : schemas) { + Producer<?> p = pulsarClient.newProducer(schema) + .topic(topicName) + .create(); + p.close(); + } + + assertEquals(schemasOfTopic.size(), schemas.length - 2); } }
