codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776647452
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
Review comment:
Add an overloading method here can avoid many changes
```java
public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes
batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes,
long totalChunkedMessages,
RedeliveryTracker redeliveryTracker) {
sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages,
totalBytes, totalChunkedMessages, redeliveryTracker, this.consumerEpoch);
}
```
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -284,21 +284,35 @@ private void receiveMessageFromConsumer(ConsumerImpl<T>
consumer) {
// Must be called from the internalPinnedExecutor thread
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message)
{
checkArgument(message instanceof MessageImpl);
- TopicMessageImpl<T> topicMessage = new
TopicMessageImpl<>(consumer.getTopic(),
- consumer.getTopicNameWithoutPartition(), message, consumer);
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received message from topics-consumer {}",
- topic, subscription, message.getMessageId());
- }
+ // Mutually exclusive with redeliver to prevent the message
+ //from being cleared after being added to the incomingMessages
+ lock.readLock().lock();
Review comment:
We can only add a check when polling out the message from the receiver
queue, looks like the queue might have messages with lower epoch, but will be
skipped when polling out from the queue.
I think it will simplify the implementation.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -561,7 +562,16 @@ private void addAbortTxnRequest(TxnID txnId, Consumer
consumer, long lowWaterMar
if (cumulativeAckOfTransaction.getKey().equals(txnId))
{
cumulativeAckOfTransaction = null;
}
-
persistentSubscription.redeliverUnacknowledgedMessages(consumer);
+ // pendingAck handle next pr will fix
Review comment:
Can you add a //TODO here and add some description of what should we do
here?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -95,9 +97,10 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
+ ReadEntriesCallBackWrapper wrapper =
ReadEntriesCallBackWrapper.create(consumer, DEFAULT_READ_EPOCH);
Review comment:
Can you add a //TODO here? the compacted topic also needs this fix.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1456,18 +1456,42 @@ protected void handleFlow(CommandFlow flow) {
@Override
protected void
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver)
{
checkArgument(state == State.Connected);
+ CompletableFuture<Consumer> consumerFuture =
consumers.get(redeliver.getConsumerId());
+ final boolean hasRequestId = redeliver.hasRequestId();
+ final long requestId = hasRequestId ? redeliver.getRequestId() : 0;
+ final long consumerId = redeliver.getConsumerId();
+
if (log.isDebugEnabled()) {
- log.debug("[{}] Received Resend Command from consumer {} ",
remoteAddress, redeliver.getConsumerId());
+ log.debug("[{}] redeliverUnacknowledged from consumer {} ,
requestId {}, consumerEpoch {}",
+ remoteAddress, redeliver.getConsumerId(), requestId,
redeliver.getConsumerEpoch());
}
- CompletableFuture<Consumer> consumerFuture =
consumers.get(redeliver.getConsumerId());
-
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 &&
Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
- consumer.redeliverUnacknowledgedMessages();
+
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch()).whenComplete((v,
e) -> {
+ if (hasRequestId) {
+ if (e != null) {
+ log.error("redeliverUnacknowledgedMessages error! "
+ + "consumerId : {}, requestId: {}",
consumerId, requestId, e);
+ ctx.writeAndFlush(Commands.newError(requestId,
+
BrokerServiceException.getClientErrorCode(e), e.getMessage()));
+ } else {
+ ctx.writeAndFlush(Commands.newSuccess(requestId));
Review comment:
Any reason for introducing the redelivery response here? If it is not
directly related to the consumer epoch introduced in PIP 84, we can separate it
into 2 parts.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
Review comment:
And the protocol handlers call this method, also can avoid modifications
from the protocol handlers.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -269,7 +269,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T>
consumer) {
// thenAcceptAsync, there is no chance for recursion that
would lead to stack overflow.
receiveMessageFromConsumer(consumer);
}
- }, internalPinnedExecutor).exceptionally(ex -> {
+ }, externalPinnedExecutor).exceptionally(ex -> {
Review comment:
why change to the external pinned executor, the `externalPinnedExecutor`
is used to trigger message listeners.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]