This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 718a95b3f919465500dd15f6eacfca8352c9412c Author: 萧易客 <[email protected]> AuthorDate: Tue Feb 22 17:17:47 2022 +0800 Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317) If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970 (cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13) --- .../apache/pulsar/client/impl/schema/AbstractSchema.java | 7 +++---- .../java/org/apache/pulsar/client/impl/MessageTest.java | 13 ++++++++++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java index 8cf7a05..33c2ed1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java @@ -75,14 +75,13 @@ public abstract class AbstractSchema<T> implements Schema<T> { * @param schemaVersion the version * @return the schema at that specific version * @throws SchemaSerializationException in case of unknown schema version - * @throws NullPointerException in case of null schemaVersion + * @throws NullPointerException in case of null schemaVersion and supportSchemaVersioning is true */ public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException { - Objects.requireNonNull(schemaVersion); if (!supportSchemaVersioning()) { return this; - } else { - throw new SchemaSerializationException("Not implemented for " + this.getClass()); } + Objects.requireNonNull(schemaVersion); + throw new SchemaSerializationException("Not implemented for " + this.getClass()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java index 6d633e7..13cf4f6 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java @@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import java.nio.ByteBuffer; +import java.util.Optional; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -81,4 +81,15 @@ public class MessageTest { assertFalse(topicMessage.isReplicated()); assertNull(topicMessage.getReplicatedFrom()); } + + @Test + public void testMessageImplGetReaderSchema() { + MessageMetadata builder = new MessageMetadata(); + builder.hasSchemaVersion(); + ByteBuffer payload = ByteBuffer.wrap(new byte[0]); + Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null); + + Optional<Schema<?>> readerSchema = msg.getReaderSchema(); + assertTrue(readerSchema.isPresent()); + } }
