This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit aafba1ac18dc729b10de62eca0e2df91025e5bce Author: Jiwei Guo <[email protected]> AuthorDate: Sat Mar 12 11:11:33 2022 +0800 Fix inconsistent prompt message when schema version is empty using AVRO. (#14626) (cherry picked from commit 190e5dbccda455a84ea7fdf491dac52cc50fbbdf) --- .../apache/pulsar/broker/service/ServerCnx.java | 5 ++ .../java/org/apache/pulsar/schema/SchemaTest.java | 76 +++++++++++++++++++++- .../client/impl/BinaryProtoLookupService.java | 8 ++- .../pulsar/client/impl/HttpLookupService.java | 5 ++ 4 files changed, 89 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2ba51d2..c18d525 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1923,6 +1923,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { long requestId = commandGetSchema.getRequestId(); SchemaVersion schemaVersion = SchemaVersion.Latest; if (commandGetSchema.hasSchemaVersion()) { + if (commandGetSchema.getSchemaVersion().length == 0) { + commandSender.sendGetSchemaErrorResponse(requestId, ServerError.IncompatibleSchema, + "Empty schema version"); + return; + } schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index ab78988..c45888f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -29,12 +29,13 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; +import com.google.common.base.Throwables; +import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; - import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Sets; - import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; @@ -44,7 +45,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; - import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -59,7 +59,9 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -1064,4 +1066,72 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } } + @Test + public void testAvroSchemaWithHttpLookup() throws Exception { + stopBroker(); + isTcpLookup = false; + setup(); + testEmptySchema(); + } + + @Test + public void testAvroSchemaWithTcpLookup() throws Exception { + stopBroker(); + isTcpLookup = true; + setup(); + testEmptySchema(); + } + + private void testEmptySchema() throws Exception { + final String namespace = "test-namespace-" + randomName(16); + String ns = PUBLIC_TENANT + "/" + namespace; + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); + + final String autoProducerTopic = getTopicName(ns, "testEmptySchema"); + + @Cleanup + Consumer<User> consumer = pulsarClient + .newConsumer(Schema.AVRO(User.class)) + .topic(autoProducerTopic) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("sub-1") + .subscribe(); + + @Cleanup + Producer<User> userProducer = pulsarClient + .newProducer(Schema.AVRO(User.class)) + .topic(autoProducerTopic) + .enableBatching(false) + .create(); + + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(autoProducerTopic) + .enableBatching(false) + .create(); + + User test = new User("test"); + userProducer.send(test); + producer.send("test".getBytes(StandardCharsets.UTF_8)); + Message<User> message1 = consumer.receive(); + Assert.assertEquals(test, message1.getValue()); + try { + Message<User> message2 = consumer.receive(); + message2.getValue(); + } catch (Throwable ex) { + Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException); + Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version"); + } + } + + @EqualsAndHashCode + static class User implements Serializable { + private String name; + public User() {} + public User(String name) { + this.name = name; + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index eb61a66..ba3281c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; @@ -217,9 +218,12 @@ public class BinaryProtoLookupService implements LookupService { @Override public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) { - InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>(); - + if (version != null && version.length == 0) { + schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version")); + return schemaFuture; + } + InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index f2cc169..72326c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceName; @@ -162,6 +163,10 @@ public class HttpLookupService implements LookupService { String schemaName = topicName.getSchemaName(); String path = String.format("admin/v2/schemas/%s/schema", schemaName); if (version != null) { + if (version.length == 0) { + future.completeExceptionally(new SchemaSerializationException("Empty schema version")); + return future; + } path = String.format("admin/v2/schemas/%s/schema/%s", schemaName, ByteBuffer.wrap(version).getLong());
