This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 32e0976bc6e [fix][sql]Fix presto sql avro decode error when publish
non-batched msgs (#17093)
32e0976bc6e is described below
commit 32e0976bc6e9fc5b3aa50102d40aebe6f94f773f
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Aug 19 07:44:45 2022 +0800
[fix][sql]Fix presto sql avro decode error when publish non-batched msgs
(#17093)
---
.../pulsar/common/api/raw/RawMessageImpl.java | 11 +++--
.../pulsar/common/api/raw/RawMessageImplTest.java | 55 +++++++++++++++++++---
2 files changed, 56 insertions(+), 10 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 3aa0cbc34b7..e3c1b4d064f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -38,6 +38,7 @@ public class RawMessageImpl implements RawMessage {
private ReferenceCountedMessageMetadata msgMetadata;
private final SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
+ private volatile boolean setSingleMessageMetadata;
private ByteBuf payload;
private static final Recycler<RawMessageImpl> RECYCLER = new
Recycler<RawMessageImpl>() {
@@ -58,6 +59,7 @@ public class RawMessageImpl implements RawMessage {
msgMetadata.release();
msgMetadata = null;
singleMessageMetadata.clear();
+ setSingleMessageMetadata = false;
payload.release();
handle.recycle(this);
@@ -73,6 +75,7 @@ public class RawMessageImpl implements RawMessage {
if (singleMessageMetadata != null) {
msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+ msg.setSingleMessageMetadata = true;
}
msg.messageId.ledgerId = ledgerId;
msg.messageId.entryId = entryId;
@@ -91,7 +94,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public Map<String, String> getProperties() {
- if (singleMessageMetadata != null &&
singleMessageMetadata.getPropertiesCount() > 0) {
+ if (setSingleMessageMetadata &&
singleMessageMetadata.getPropertiesCount() > 0) {
return singleMessageMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue,
(oldValue,newValue) -> newValue));
@@ -120,7 +123,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public long getEventTime() {
- if (singleMessageMetadata != null &&
singleMessageMetadata.hasEventTime()) {
+ if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
return singleMessageMetadata.getEventTime();
} else if (msgMetadata.getMetadata().hasEventTime()) {
return msgMetadata.getMetadata().getEventTime();
@@ -141,7 +144,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public Optional<String> getKey() {
- if (singleMessageMetadata != null &&
singleMessageMetadata.hasPartitionKey()) {
+ if (setSingleMessageMetadata &&
singleMessageMetadata.hasPartitionKey()) {
return Optional.of(singleMessageMetadata.getPartitionKey());
} else if (msgMetadata.getMetadata().hasPartitionKey()){
return Optional.of(msgMetadata.getMetadata().getPartitionKey());
@@ -172,7 +175,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public boolean hasBase64EncodedKey() {
- if (singleMessageMetadata != null) {
+ if (setSingleMessageMetadata) {
return singleMessageMetadata.isPartitionKeyB64Encoded();
}
return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 037a6ffbe4c..a602cef4843 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -18,15 +18,20 @@
*/
package org.apache.pulsar.common.api.raw;
+import static java.util.Collections.singletonList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
public class RawMessageImplTest {
private static final String HARD_CODE_KEY = "__pfn_input_topic__";
@@ -38,7 +43,7 @@ public class RawMessageImplTest {
@Test
public void testGetProperties() {
ReferenceCountedMessageMetadata refCntMsgMetadata =
-
ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
+ ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
@@ -50,4 +55,42 @@ public class RawMessageImplTest {
assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
}
+
+ @Test
+ public void testNonBatchedMessage() {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ messageMetadata.addAllProperties(singletonList(new
KeyValue().setKey("key1").setValue("value1")));
+ messageMetadata.setEventTime(100L);
+
+ ReferenceCountedMessageMetadata refCntMsgMetadata =
mock(ReferenceCountedMessageMetadata.class);
+ when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+ // Non-batched message's singleMessageMetadata is null
+ RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0,
0, 0);
+ assertTrue(msg.hasBase64EncodedKey());
+ assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
+ assertEquals(msg.getEventTime(), 100L);
+ }
+
+ @Test
+ public void testBatchedMessage() {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ messageMetadata.addAllProperties(singletonList(new
KeyValue().setKey("key1").setValue("value1")));
+ messageMetadata.setEventTime(100L);
+
+ ReferenceCountedMessageMetadata refCntMsgMetadata =
mock(ReferenceCountedMessageMetadata.class);
+ when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+ SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
+ singleMessageMetadata.setPartitionKeyB64Encoded(false);
+ singleMessageMetadata.addAllProperties(singletonList(new
KeyValue().setKey("key2").setValue("value2")));
+ singleMessageMetadata.setEventTime(200L);
+
+ RawMessage msg = RawMessageImpl.get(refCntMsgMetadata,
singleMessageMetadata, null, 0, 0, 0);
+ assertFalse(msg.hasBase64EncodedKey());
+ assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
+ assertEquals(msg.getEventTime(), 200L);
+ }
}
\ No newline at end of file