This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 32e0976bc6e [fix][sql]Fix presto sql avro decode error when publish 
non-batched msgs (#17093)
32e0976bc6e is described below

commit 32e0976bc6e9fc5b3aa50102d40aebe6f94f773f
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Aug 19 07:44:45 2022 +0800

    [fix][sql]Fix presto sql avro decode error when publish non-batched msgs 
(#17093)
---
 .../pulsar/common/api/raw/RawMessageImpl.java      | 11 +++--
 .../pulsar/common/api/raw/RawMessageImplTest.java  | 55 +++++++++++++++++++---
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 3aa0cbc34b7..e3c1b4d064f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -38,6 +38,7 @@ public class RawMessageImpl implements RawMessage {
 
     private ReferenceCountedMessageMetadata msgMetadata;
     private final SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
+    private volatile boolean setSingleMessageMetadata;
     private ByteBuf payload;
 
     private static final Recycler<RawMessageImpl> RECYCLER = new 
Recycler<RawMessageImpl>() {
@@ -58,6 +59,7 @@ public class RawMessageImpl implements RawMessage {
         msgMetadata.release();
         msgMetadata = null;
         singleMessageMetadata.clear();
+        setSingleMessageMetadata = false;
 
         payload.release();
         handle.recycle(this);
@@ -73,6 +75,7 @@ public class RawMessageImpl implements RawMessage {
 
         if (singleMessageMetadata != null) {
             msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+            msg.setSingleMessageMetadata = true;
         }
         msg.messageId.ledgerId = ledgerId;
         msg.messageId.entryId = entryId;
@@ -91,7 +94,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Map<String, String> getProperties() {
-        if (singleMessageMetadata != null && 
singleMessageMetadata.getPropertiesCount() > 0) {
+        if (setSingleMessageMetadata && 
singleMessageMetadata.getPropertiesCount() > 0) {
             return singleMessageMetadata.getPropertiesList().stream()
                       .collect(Collectors.toMap(KeyValue::getKey, 
KeyValue::getValue,
                               (oldValue,newValue) -> newValue));
@@ -120,7 +123,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public long getEventTime() {
-        if (singleMessageMetadata != null && 
singleMessageMetadata.hasEventTime()) {
+        if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
             return singleMessageMetadata.getEventTime();
         } else if (msgMetadata.getMetadata().hasEventTime()) {
             return msgMetadata.getMetadata().getEventTime();
@@ -141,7 +144,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Optional<String> getKey() {
-        if (singleMessageMetadata != null && 
singleMessageMetadata.hasPartitionKey()) {
+        if (setSingleMessageMetadata && 
singleMessageMetadata.hasPartitionKey()) {
             return Optional.of(singleMessageMetadata.getPartitionKey());
         } else if (msgMetadata.getMetadata().hasPartitionKey()){
             return Optional.of(msgMetadata.getMetadata().getPartitionKey());
@@ -172,7 +175,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public boolean hasBase64EncodedKey() {
-        if (singleMessageMetadata != null) {
+        if (setSingleMessageMetadata) {
             return singleMessageMetadata.isPartitionKeyB64Encoded();
         }
         return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 037a6ffbe4c..a602cef4843 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pulsar.common.api.raw;
 
+import static java.util.Collections.singletonList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
 public class RawMessageImplTest {
 
     private static final String HARD_CODE_KEY = "__pfn_input_topic__";
@@ -38,7 +43,7 @@ public class RawMessageImplTest {
     @Test
     public void testGetProperties() {
         ReferenceCountedMessageMetadata refCntMsgMetadata =
-                
ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
+                ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
         SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
         
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
         
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
@@ -50,4 +55,42 @@ public class RawMessageImplTest {
         assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
         assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
     }
+
+    @Test
+    public void testNonBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new 
KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = 
mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        // Non-batched message's singleMessageMetadata is null
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 
0, 0);
+        assertTrue(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
+        assertEquals(msg.getEventTime(), 100L);
+    }
+
+    @Test
+    public void testBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new 
KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = 
mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
+        singleMessageMetadata.setPartitionKeyB64Encoded(false);
+        singleMessageMetadata.addAllProperties(singletonList(new 
KeyValue().setKey("key2").setValue("value2")));
+        singleMessageMetadata.setEventTime(200L);
+
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, 
singleMessageMetadata, null, 0, 0, 0);
+        assertFalse(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
+        assertEquals(msg.getEventTime(), 200L);
+    }
 }
\ No newline at end of file

Reply via email to