This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 718a95b3f919465500dd15f6eacfca8352c9412c
Author: 萧易客 <[email protected]>
AuthorDate: Tue Feb 22 17:17:47 2022 +0800

    Fix send to deadLetterTopic not working when reach maxRedeliverCount 
(#14317)
    
    If a message reached maxRedeliverCount, it will send to deadLetterTopic, 
since 2.8.0, this mechanism is broken, it was introduced in #9970
    
    (cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13)
---
 .../apache/pulsar/client/impl/schema/AbstractSchema.java    |  7 +++----
 .../java/org/apache/pulsar/client/impl/MessageTest.java     | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 8cf7a05..33c2ed1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -75,14 +75,13 @@ public abstract class AbstractSchema<T> implements 
Schema<T> {
      * @param schemaVersion the version
      * @return the schema at that specific version
      * @throws SchemaSerializationException in case of unknown schema version
-     * @throws NullPointerException in case of null schemaVersion
+     * @throws NullPointerException in case of null schemaVersion and 
supportSchemaVersioning is true
      */
     public Schema<?> atSchemaVersion(byte[] schemaVersion) throws 
SchemaSerializationException {
-        Objects.requireNonNull(schemaVersion);
         if (!supportSchemaVersioning()) {
             return this;
-        } else {
-            throw new SchemaSerializationException("Not implemented for " + 
this.getClass());
         }
+        Objects.requireNonNull(schemaVersion);
+        throw new SchemaSerializationException("Not implemented for " + 
this.getClass());
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 6d633e7..13cf4f6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -81,4 +81,15 @@ public class MessageTest {
         assertFalse(topicMessage.isReplicated());
         assertNull(topicMessage.getReplicatedFrom());
     }
+
+    @Test
+    public void testMessageImplGetReaderSchema() {
+        MessageMetadata builder = new MessageMetadata();
+        builder.hasSchemaVersion();
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
+
+        Optional<Schema<?>> readerSchema = msg.getReaderSchema();
+        assertTrue(readerSchema.isPresent());
+    }
 }

Reply via email to