This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 4f65f65c6fe [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622) 4f65f65c6fe is described below commit 4f65f65c6fe6f7e97e83a13d038c6dbf28c3f8c6 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu May 19 11:49:25 2022 +0800 [Java Client] Fix messages sent by producers without schema cannot be decoded (#15622) ### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Make `MesasgeImpl#getSchemaVersion` return null if the schema version is an empty byte array so that the consumer can consume messages produced by older version producers without schema. And return the internal schema for `getRegetReaderSchema` when `getSchemaVersion` returns null. - Fix the existing tests. Since producer without schema won't set the `schema_version` field after this patch, some tests that rely on the precise stats should be modified. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`: > org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `SerializationException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object. (cherry picked from commit ecd275dc21f33483a649e5b872990771257b1d45) --- .../pulsar/broker/admin/PersistentTopicsTest.java | 2 +- .../RGUsageMTAggrWaitForAllMsgsTest.java | 7 ++- .../pulsar/broker/stats/PrometheusMetricsTest.java | 2 +- .../apache/pulsar/client/api/SimpleSchemaTest.java | 52 ++++++++++++++++++++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 17 +++---- .../org/apache/pulsar/client/impl/MessageImpl.java | 26 +++++++++-- .../apache/pulsar/client/impl/ProducerImpl.java | 11 +++-- .../pulsar/client/impl/ProducerResponse.java | 14 +++++- 8 files changed, 105 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 78f3d2e7a20..2baf5b0f469 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -737,7 +737,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { completableFuture = batchProducer.sendAsync("a".getBytes()); } completableFuture.get(); - Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(350L)); + Assert.assertEquals(Optional.ofNullable(admin.topics().getBacklogSizeByMessageId(topicName + "-partition-0", MessageId.earliest)), Optional.of(320L)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index fda8693dd84..ce3f033a3b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -556,7 +556,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { log.debug("verfyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); - // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message + // Pulsar runtime adds some additional bytes in the exchanges: a 42-byte per-message // metadata of some kind, plus more as the number of messages increases. // Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks. final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs; @@ -787,9 +787,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { } private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMsgsTest.class); - - // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime. - private static final int PER_MESSAGE_METADATA_OHEAD = 45; + // Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime. + private static final int PER_MESSAGE_METADATA_OHEAD = 42; private static final int PUBLISH_INTERVAL_SECS = 10; private static final int NUM_PRODUCERS = 4; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 9f098935efa..3a7d23b06e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1217,7 +1217,7 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertEquals(cm.get(0).value, 10); cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size"); assertEquals(cm.size(), 1); - assertEquals(cm.get(0).value, 870); + assertEquals(cm.get(0).value, 840); pulsarClient.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index fd8036eaf9e..983a7f341e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -49,7 +49,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.reader.AvroReader; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; @@ -62,6 +61,7 @@ import org.testng.annotations.Factory; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -305,7 +305,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase { + " if SchemaValidationEnabled is enabled"); } Message<V2Data> msg3 = c.receive(); - Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes()); + assertNull(msg3.getSchemaVersion()); + try { + msg3.getValue(); + fail("Schema should be incompatible"); + } catch (SchemaSerializationException e) { + assertTrue(e.getCause() instanceof EOFException); + } } catch (PulsarClientException e) { if (schemaValidationEnforced) { Assert.assertTrue(e instanceof IncompatibleSchemaException); @@ -366,7 +372,13 @@ public class SimpleSchemaTest extends ProducerConsumerBase { + " if SchemaValidationEnabled is enabled"); } Message<V2Data> msg3 = c.receive(); - Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes()); + assertNull(msg3.getSchemaVersion()); + try { + msg3.getValue(); + fail("Schema should be incompatible"); + } catch (SchemaSerializationException e) { + assertTrue(e.getCause() instanceof EOFException); + } } catch (PulsarClientException e) { if (schemaValidationEnforced) { Assert.assertTrue(e instanceof IncompatibleSchemaException); @@ -1253,4 +1265,38 @@ public class SimpleSchemaTest extends ProducerConsumerBase { } } + + @Test + public void testConsumeAvroMessagesWithoutSchema() throws Exception { + if (schemaValidationEnforced) { + return; + } + final String topic = "test-consume-avro-messages-without-schema-" + UUID.randomUUID(); + final Schema<V1Data> schema = Schema.AVRO(V1Data.class); + final Consumer<V1Data> consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + final Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.send(schema.encode(new V1Data(i))); + } + + for (int i = 0; i < numMessages; i++) { + final Message<V1Data> msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); + log.info("Received {} from {}", msg.getValue().i, topic); + assertEquals(msg.getValue().i, i); + assertEquals(msg.getReaderSchema().orElse(Schema.BYTES).getSchemaInfo(), schema.getSchemaInfo()); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index c45888f3858..682d6a52b78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -29,7 +29,6 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; -import com.google.common.base.Throwables; import lombok.EqualsAndHashCode; import org.apache.avro.Schema.Parser; import com.fasterxml.jackson.databind.JsonNode; @@ -1071,7 +1070,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { stopBroker(); isTcpLookup = false; setup(); - testEmptySchema(); + testIncompatibleSchema(); } @Test @@ -1079,10 +1078,10 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { stopBroker(); isTcpLookup = true; setup(); - testEmptySchema(); + testIncompatibleSchema(); } - private void testEmptySchema() throws Exception { + private void testIncompatibleSchema() throws Exception { final String namespace = "test-namespace-" + randomName(16); String ns = PUBLIC_TENANT + "/" + namespace; admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); @@ -1116,12 +1115,14 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producer.send("test".getBytes(StandardCharsets.UTF_8)); Message<User> message1 = consumer.receive(); Assert.assertEquals(test, message1.getValue()); + Message<User> message2 = consumer.receive(); try { - Message<User> message2 = consumer.receive(); message2.getValue(); - } catch (Throwable ex) { - Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException); - Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version"); + } catch (SchemaSerializationException e) { + final String schemaString = + new String(Schema.AVRO(User.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8); + Assert.assertTrue(e.getMessage().contains(schemaString)); + Assert.assertTrue(e.getMessage().contains("payload (4 bytes)")); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 67c176cfe63..fd00475de71 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -31,6 +31,7 @@ import io.netty.util.Recycler.Handle; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collections; import java.util.List; @@ -42,6 +43,7 @@ import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; @@ -391,12 +393,14 @@ public class MessageImpl<T> implements Message<T> { if (schema == null) { return Optional.empty(); } + byte[] schemaVersion = getSchemaVersion(); + if (schemaVersion == null) { + return Optional.of(schema); + } if (schema instanceof AutoConsumeSchema) { - byte[] schemaVersion = getSchemaVersion(); return Optional.of(((AutoConsumeSchema) schema) .atSchemaVersion(schemaVersion)); } else if (schema instanceof AbstractSchema) { - byte[] schemaVersion = getSchemaVersion(); return Optional.of(((AbstractSchema<?>) schema) .atSchemaVersion(schemaVersion)); } else { @@ -404,10 +408,13 @@ public class MessageImpl<T> implements Message<T> { } } + // For messages produced by older version producers without schema, the schema version is an empty byte array + // rather than null. @Override public byte[] getSchemaVersion() { if (msgMetadata.hasSchemaVersion()) { - return msgMetadata.getSchemaVersion(); + byte[] schemaVersion = msgMetadata.getSchemaVersion(); + return (schemaVersion.length == 0) ? null : schemaVersion; } else { return null; } @@ -472,8 +479,19 @@ public class MessageImpl<T> implements Message<T> { } } - private T decode(byte[] schemaVersion) { + try { + return decodeBySchema(schemaVersion); + } catch (ArrayIndexOutOfBoundsException e) { + // It usually means the message was produced without schema check while the message is not compatible with + // the current schema. Therefore, convert it to SchemaSerializationException with a better description. + final int payloadSize = payload.readableBytes(); + throw new SchemaSerializationException("payload (" + payloadSize + " bytes) cannot be decoded with schema " + + new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8)); + } + } + + private T decodeBySchema(byte[] schemaVersion) { T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null; if (value != null) { return value; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 2637c495304..d5d1d6e7382 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -675,9 +675,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } else { log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName); - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - schemaCache.putIfAbsent(schemaHash, v); - msg.getMessageBuilder().setSchemaVersion(v); + // In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this + // case, we should not cache the schema version so that the schema version of the message metadata will + // be null, instead of an empty array. + if (v.length != 0) { + SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); + schemaCache.putIfAbsent(schemaHash, v); + msg.getMessageBuilder().setSchemaVersion(v); + } msg.setSchemaState(MessageImpl.SchemaState.Ready); } cnx.ctx().channel().eventLoop().execute(() -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java index 36b47f2b6d6..2c9cfa74d1a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java @@ -21,9 +21,9 @@ package org.apache.pulsar.client.impl; import java.util.Optional; import lombok.AllArgsConstructor; -import lombok.Data; +import lombok.Getter; -@Data +@Getter @AllArgsConstructor public class ProducerResponse { private String producerName; @@ -31,4 +31,14 @@ public class ProducerResponse { private byte[] schemaVersion; private Optional<Long> topicEpoch; + + // Shadow the default getter generated by lombok. In broker, if the schema version is an empty byte array, it means + // the topic doesn't have schema. + public byte[] getSchemaVersion() { + if (schemaVersion != null && schemaVersion.length != 0) { + return schemaVersion; + } else { + return null; + } + } }