This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66c5e1f1ae4 Revert "[fix][broker] Miss headersAndPayload and
messageIdData in MessagePublishContext (#21245) (#21309)
66c5e1f1ae4 is described below
commit 66c5e1f1ae48e49ed27a72424c372ce444217d86
Author: Penghui Li <[email protected]>
AuthorDate: Mon Oct 9 21:48:27 2023 +0800
Revert "[fix][broker] Miss headersAndPayload and messageIdData in
MessagePublishContext (#21245) (#21309)
---
.../org/apache/pulsar/broker/service/Producer.java | 73 ++++++++--------------
.../apache/pulsar/broker/service/ServerCnx.java | 8 ++-
.../org/apache/pulsar/broker/service/Topic.java | 9 ---
3 files changed, 30 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 70cca8b8212..f7d2bb2dd27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -46,7 +46,6 @@ import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
-import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ServerError;
@@ -187,15 +186,14 @@ public class Producer {
}
public void publishMessage(long producerId, long sequenceId, ByteBuf
headersAndPayload, long batchSize,
- boolean isChunked, boolean isMarker, MessageIdData messageIdData) {
- if (checkAndStartPublish(producerId, sequenceId, headersAndPayload,
batchSize, messageIdData)) {
- publishMessageToTopic(headersAndPayload, sequenceId, batchSize,
isChunked, isMarker, messageIdData);
+ boolean isChunked, boolean isMarker, Position position) {
+ if (checkAndStartPublish(producerId, sequenceId, headersAndPayload,
batchSize, position)) {
+ publishMessageToTopic(headersAndPayload, sequenceId, batchSize,
isChunked, isMarker, position);
}
}
public void publishMessage(long producerId, long lowestSequenceId, long
highestSequenceId,
- ByteBuf headersAndPayload, long batchSize,
boolean isChunked, boolean isMarker,
- MessageIdData messageIdData) {
+ ByteBuf headersAndPayload, long batchSize, boolean isChunked,
boolean isMarker, Position position) {
if (lowestSequenceId > highestSequenceId) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId,
highestSequenceId, ServerError.MetadataError,
@@ -204,15 +202,15 @@ public class Producer {
});
return;
}
- if (checkAndStartPublish(producerId, highestSequenceId,
headersAndPayload, batchSize, messageIdData)) {
+ if (checkAndStartPublish(producerId, highestSequenceId,
headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, lowestSequenceId,
highestSequenceId, batchSize, isChunked,
- isMarker, messageIdData);
+ isMarker, position);
}
}
public boolean checkAndStartPublish(long producerId, long sequenceId,
ByteBuf headersAndPayload, long batchSize,
- MessageIdData messageIdData) {
- if (!isShadowTopic && messageIdData != null) {
+ Position position) {
+ if (!isShadowTopic && position != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId,
ServerError.NotAllowedError,
"Only shadow topic supports sending messages with
messageId");
@@ -220,7 +218,7 @@ public class Producer {
});
return false;
}
- if (isShadowTopic && messageIdData == null) {
+ if (isShadowTopic && position == null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId,
ServerError.NotAllowedError,
"Cannot send messages to a shadow topic");
@@ -269,10 +267,10 @@ public class Producer {
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long
sequenceId, long batchSize, boolean isChunked,
- boolean isMarker, MessageIdData
messageIdData) {
+ boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
- MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload,
- batchSize, isChunked, System.nanoTime(), isMarker,
messageIdData);
+ MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(),
+ batchSize, isChunked, System.nanoTime(), isMarker,
position);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload,
messagePublishContext);
@@ -281,11 +279,10 @@ public class Producer {
}
private void publishMessageToTopic(ByteBuf headersAndPayload, long
lowestSequenceId, long highestSequenceId,
- long batchSize, boolean isChunked,
boolean isMarker,
- MessageIdData messageIdData) {
+ long batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, lowestSequenceId,
- highestSequenceId, msgIn, headersAndPayload, batchSize,
- isChunked, System.nanoTime(), isMarker, messageIdData);
+ highestSequenceId, msgIn, headersAndPayload.readableBytes(),
batchSize,
+ isChunked, System.nanoTime(), isMarker, position);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload,
messagePublishContext);
@@ -376,8 +373,6 @@ public class Producer {
private long sequenceId;
private long ledgerId;
private long entryId;
- private MessageIdData messageIdData;
- private ByteBuf headerAndPayload;
private Rate rateIn;
private int msgSize;
private long batchSize;
@@ -554,24 +549,21 @@ public class Producer {
recycle();
}
- static MessagePublishContext get(Producer producer, long sequenceId,
Rate rateIn, ByteBuf headersAndPayload,
- long batchSize, boolean chunked, long
startTimeNs, boolean isMarker,
- MessageIdData messageIdData) {
+ static MessagePublishContext get(Producer producer, long sequenceId,
Rate rateIn, int msgSize,
+ long batchSize, boolean chunked, long startTimeNs, boolean
isMarker, Position position) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
- callback.msgSize = headersAndPayload.readableBytes();
+ callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
- callback.headerAndPayload = headersAndPayload;
- callback.messageIdData = messageIdData;
- callback.ledgerId = messageIdData == null ? -1 :
messageIdData.getLedgerId();
- callback.entryId = messageIdData == null ? -1 :
messageIdData.getEntryId();
+ callback.ledgerId = position == null ? -1 : position.getLedgerId();
+ callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
@@ -579,24 +571,21 @@ public class Producer {
}
static MessagePublishContext get(Producer producer, long
lowestSequenceId, long highestSequenceId, Rate rateIn,
- ByteBuf headersAndPayload, long
batchSize, boolean chunked, long startTimeNs,
- boolean isMarker, MessageIdData
messageIdData) {
+ int msgSize, long batchSize, boolean chunked, long
startTimeNs, boolean isMarker, Position position) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
callback.highestSequenceId = highestSequenceId;
callback.rateIn = rateIn;
- callback.msgSize = headersAndPayload.readableBytes();
+ callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
- callback.headerAndPayload = headersAndPayload;
- callback.messageIdData = messageIdData;
- callback.ledgerId = messageIdData == null ? -1 :
messageIdData.getLedgerId();
- callback.entryId = messageIdData == null ? -1 :
messageIdData.getEntryId();
+ callback.ledgerId = position == null ? -1 : position.getLedgerId();
+ callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
@@ -618,16 +607,6 @@ public class Producer {
return isMarker;
}
- @Override
- public MessageIdData getMessageIdData() {
- return messageIdData;
- }
-
- @Override
- public ByteBuf getHeaderAndPayload() {
- return headerAndPayload;
- }
-
private final Handle<MessagePublishContext> recyclerHandle;
private MessagePublishContext(Handle<MessagePublishContext>
recyclerHandle) {
@@ -654,8 +633,6 @@ public class Producer {
startTimeNs = -1L;
chunked = false;
isMarker = false;
- messageIdData = null;
- headerAndPayload = null;
if (propertyMap != null) {
propertyMap.clear();
}
@@ -834,7 +811,7 @@ public class Producer {
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
msgIn,
- headersAndPayload, batchSize, isChunked,
System.nanoTime(), isMarker, null);
+ headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, null);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload,
messagePublishContext);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d904a0547a5..0517fff0f03 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1752,15 +1752,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- MessageIdData messageIdData = send.hasMessageId() ?
send.getMessageId() : null;
+ // This position is only used for shadow replicator
+ Position position = send.hasMessageId()
+ ? PositionImpl.get(send.getMessageId().getLedgerId(),
send.getMessageId().getEntryId()) : null;
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <=
send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(),
send.getSequenceId(), send.getHighestSequenceId(),
- headersAndPayload, send.getNumMessages(),
send.isIsChunk(), send.isMarker(), messageIdData);
+ headersAndPayload, send.getNumMessages(),
send.isIsChunk(), send.isMarker(), position);
} else {
producer.publishMessage(send.getProducerId(),
send.getSequenceId(), headersAndPayload,
- send.getNumMessages(), send.isIsChunk(), send.isMarker(),
messageIdData);
+ send.getNumMessages(), send.isIsChunk(), send.isMarker(),
position);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d38329c9516..7657d77e129 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -35,7 +35,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.EntryFilters;
@@ -128,14 +127,6 @@ public interface Topic {
default void setEntryTimestamp(long entryTimestamp) {
}
-
- default MessageIdData getMessageIdData() {
- return null;
- }
-
- default ByteBuf getHeaderAndPayload() {
- return null;
- }
}
CompletableFuture<Void> initialize();