This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66c5e1f1ae4 Revert "[fix][broker] Miss headersAndPayload and 
messageIdData in MessagePublishContext (#21245) (#21309)
66c5e1f1ae4 is described below

commit 66c5e1f1ae48e49ed27a72424c372ce444217d86
Author: Penghui Li <[email protected]>
AuthorDate: Mon Oct 9 21:48:27 2023 +0800

    Revert "[fix][broker] Miss headersAndPayload and messageIdData in 
MessagePublishContext (#21245) (#21309)
---
 .../org/apache/pulsar/broker/service/Producer.java | 73 ++++++++--------------
 .../apache/pulsar/broker/service/ServerCnx.java    |  8 ++-
 .../org/apache/pulsar/broker/service/Topic.java    |  9 ---
 3 files changed, 30 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 70cca8b8212..f7d2bb2dd27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -46,7 +46,6 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
-import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ServerError;
@@ -187,15 +186,14 @@ public class Producer {
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf 
headersAndPayload, long batchSize,
-            boolean isChunked, boolean isMarker, MessageIdData messageIdData) {
-        if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, 
batchSize, messageIdData)) {
-            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, 
isChunked, isMarker, messageIdData);
+            boolean isChunked, boolean isMarker, Position position) {
+        if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, 
batchSize, position)) {
+            publishMessageToTopic(headersAndPayload, sequenceId, batchSize, 
isChunked, isMarker, position);
         }
     }
 
     public void publishMessage(long producerId, long lowestSequenceId, long 
highestSequenceId,
-                               ByteBuf headersAndPayload, long batchSize, 
boolean isChunked, boolean isMarker,
-                               MessageIdData messageIdData) {
+            ByteBuf headersAndPayload, long batchSize, boolean isChunked, 
boolean isMarker, Position position) {
         if (lowestSequenceId > highestSequenceId) {
             cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, 
highestSequenceId, ServerError.MetadataError,
@@ -204,15 +202,15 @@ public class Producer {
             });
             return;
         }
-        if (checkAndStartPublish(producerId, highestSequenceId, 
headersAndPayload, batchSize, messageIdData)) {
+        if (checkAndStartPublish(producerId, highestSequenceId, 
headersAndPayload, batchSize, position)) {
             publishMessageToTopic(headersAndPayload, lowestSequenceId, 
highestSequenceId, batchSize, isChunked,
-                    isMarker, messageIdData);
+                    isMarker, position);
         }
     }
 
     public boolean checkAndStartPublish(long producerId, long sequenceId, 
ByteBuf headersAndPayload, long batchSize,
-                                        MessageIdData messageIdData) {
-        if (!isShadowTopic && messageIdData != null) {
+                                        Position position) {
+        if (!isShadowTopic && position != null) {
             cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, sequenceId, 
ServerError.NotAllowedError,
                         "Only shadow topic supports sending messages with 
messageId");
@@ -220,7 +218,7 @@ public class Producer {
             });
             return false;
         }
-        if (isShadowTopic && messageIdData == null) {
+        if (isShadowTopic && position == null) {
             cnx.execute(() -> {
                 cnx.getCommandSender().sendSendError(producerId, sequenceId, 
ServerError.NotAllowedError,
                         "Cannot send messages to a shadow topic");
@@ -269,10 +267,10 @@ public class Producer {
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long 
sequenceId, long batchSize, boolean isChunked,
-                                       boolean isMarker, MessageIdData 
messageIdData) {
+                                       boolean isMarker, Position position) {
         MessagePublishContext messagePublishContext =
-                MessagePublishContext.get(this, sequenceId, msgIn, 
headersAndPayload,
-                        batchSize, isChunked, System.nanoTime(), isMarker, 
messageIdData);
+                MessagePublishContext.get(this, sequenceId, msgIn, 
headersAndPayload.readableBytes(),
+                        batchSize, isChunked, System.nanoTime(), isMarker, 
position);
         if (brokerInterceptor != null) {
             brokerInterceptor
                     .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
@@ -281,11 +279,10 @@ public class Producer {
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long 
lowestSequenceId, long highestSequenceId,
-                                       long batchSize, boolean isChunked, 
boolean isMarker,
-                                       MessageIdData messageIdData) {
+                                       long batchSize, boolean isChunked, 
boolean isMarker, Position position) {
         MessagePublishContext messagePublishContext = 
MessagePublishContext.get(this, lowestSequenceId,
-                highestSequenceId, msgIn, headersAndPayload, batchSize,
-                isChunked, System.nanoTime(), isMarker, messageIdData);
+                highestSequenceId, msgIn, headersAndPayload.readableBytes(), 
batchSize,
+                isChunked, System.nanoTime(), isMarker, position);
         if (brokerInterceptor != null) {
             brokerInterceptor
                     .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
@@ -376,8 +373,6 @@ public class Producer {
         private long sequenceId;
         private long ledgerId;
         private long entryId;
-        private MessageIdData messageIdData;
-        private ByteBuf headerAndPayload;
         private Rate rateIn;
         private int msgSize;
         private long batchSize;
@@ -554,24 +549,21 @@ public class Producer {
             recycle();
         }
 
-        static MessagePublishContext get(Producer producer, long sequenceId, 
Rate rateIn, ByteBuf headersAndPayload,
-                                         long batchSize, boolean chunked, long 
startTimeNs, boolean isMarker,
-                                         MessageIdData messageIdData) {
+        static MessagePublishContext get(Producer producer, long sequenceId, 
Rate rateIn, int msgSize,
+                long batchSize, boolean chunked, long startTimeNs, boolean 
isMarker, Position position) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = sequenceId;
             callback.rateIn = rateIn;
-            callback.msgSize = headersAndPayload.readableBytes();
+            callback.msgSize = msgSize;
             callback.batchSize = batchSize;
             callback.chunked = chunked;
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
             callback.isMarker = isMarker;
-            callback.headerAndPayload = headersAndPayload;
-            callback.messageIdData = messageIdData;
-            callback.ledgerId = messageIdData == null ? -1 : 
messageIdData.getLedgerId();
-            callback.entryId = messageIdData == null ? -1 : 
messageIdData.getEntryId();
+            callback.ledgerId = position == null ? -1 : position.getLedgerId();
+            callback.entryId = position == null ? -1 : position.getEntryId();
             if (callback.propertyMap != null) {
                 callback.propertyMap.clear();
             }
@@ -579,24 +571,21 @@ public class Producer {
         }
 
         static MessagePublishContext get(Producer producer, long 
lowestSequenceId, long highestSequenceId, Rate rateIn,
-                                         ByteBuf headersAndPayload, long 
batchSize, boolean chunked, long startTimeNs,
-                                         boolean isMarker, MessageIdData 
messageIdData) {
+                int msgSize, long batchSize, boolean chunked, long 
startTimeNs, boolean isMarker, Position position) {
             MessagePublishContext callback = RECYCLER.get();
             callback.producer = producer;
             callback.sequenceId = lowestSequenceId;
             callback.highestSequenceId = highestSequenceId;
             callback.rateIn = rateIn;
-            callback.msgSize = headersAndPayload.readableBytes();
+            callback.msgSize = msgSize;
             callback.batchSize = batchSize;
             callback.originalProducerName = null;
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
             callback.chunked = chunked;
             callback.isMarker = isMarker;
-            callback.headerAndPayload = headersAndPayload;
-            callback.messageIdData = messageIdData;
-            callback.ledgerId = messageIdData == null ? -1 : 
messageIdData.getLedgerId();
-            callback.entryId = messageIdData == null ? -1 : 
messageIdData.getEntryId();
+            callback.ledgerId = position == null ? -1 : position.getLedgerId();
+            callback.entryId = position == null ? -1 : position.getEntryId();
             if (callback.propertyMap != null) {
                 callback.propertyMap.clear();
             }
@@ -618,16 +607,6 @@ public class Producer {
             return isMarker;
         }
 
-        @Override
-        public MessageIdData getMessageIdData() {
-            return messageIdData;
-        }
-
-        @Override
-        public ByteBuf getHeaderAndPayload() {
-            return headerAndPayload;
-        }
-
         private final Handle<MessagePublishContext> recyclerHandle;
 
         private MessagePublishContext(Handle<MessagePublishContext> 
recyclerHandle) {
@@ -654,8 +633,6 @@ public class Producer {
             startTimeNs = -1L;
             chunked = false;
             isMarker = false;
-            messageIdData = null;
-            headerAndPayload = null;
             if (propertyMap != null) {
                 propertyMap.clear();
             }
@@ -834,7 +811,7 @@ public class Producer {
         }
         MessagePublishContext messagePublishContext =
                 MessagePublishContext.get(this, sequenceId, highSequenceId, 
msgIn,
-                        headersAndPayload, batchSize, isChunked, 
System.nanoTime(), isMarker, null);
+                        headersAndPayload.readableBytes(), batchSize, 
isChunked, System.nanoTime(), isMarker, null);
         if (brokerInterceptor != null) {
             brokerInterceptor
                     .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d904a0547a5..0517fff0f03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1752,15 +1752,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        MessageIdData messageIdData = send.hasMessageId() ? 
send.getMessageId() : null;
+        // This position is only used for shadow replicator
+        Position position = send.hasMessageId()
+                ? PositionImpl.get(send.getMessageId().getLedgerId(), 
send.getMessageId().getEntryId()) : null;
 
         // Persist the message
         if (send.hasHighestSequenceId() && send.getSequenceId() <= 
send.getHighestSequenceId()) {
             producer.publishMessage(send.getProducerId(), 
send.getSequenceId(), send.getHighestSequenceId(),
-                    headersAndPayload, send.getNumMessages(), 
send.isIsChunk(), send.isMarker(), messageIdData);
+                    headersAndPayload, send.getNumMessages(), 
send.isIsChunk(), send.isMarker(), position);
         } else {
             producer.publishMessage(send.getProducerId(), 
send.getSequenceId(), headersAndPayload,
-                    send.getNumMessages(), send.isIsChunk(), send.isMarker(), 
messageIdData);
+                    send.getNumMessages(), send.isIsChunk(), send.isMarker(), 
position);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d38329c9516..7657d77e129 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -35,7 +35,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.EntryFilters;
@@ -128,14 +127,6 @@ public interface Topic {
         default void setEntryTimestamp(long entryTimestamp) {
 
         }
-
-        default MessageIdData getMessageIdData() {
-            return null;
-        }
-
-        default ByteBuf getHeaderAndPayload() {
-            return null;
-        }
     }
 
     CompletableFuture<Void> initialize();

Reply via email to