codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776647452



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
     public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,

Review comment:
       Add an overloading method here can avoid many changes
   
   ```java
   public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes 
batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
                                        int totalMessages, long totalBytes, 
long totalChunkedMessages,
                                        RedeliveryTracker redeliveryTracker) {
       sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, 
totalBytes, totalChunkedMessages, redeliveryTracker, this.consumerEpoch);
   }
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -284,21 +284,35 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> 
consumer) {
     // Must be called from the internalPinnedExecutor thread
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) 
{
         checkArgument(message instanceof MessageImpl);
-        TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(),
-                consumer.getTopicNameWithoutPartition(), message, consumer);
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}][{}] Received message from topics-consumer {}",
-                    topic, subscription, message.getMessageId());
-        }
+        // Mutually exclusive with redeliver to prevent the message
+        //from being cleared after being added to the incomingMessages
+        lock.readLock().lock();

Review comment:
       We can only add a check when polling out the message from the receiver 
queue, looks like the queue might have messages with lower epoch, but will be 
skipped when polling out from the queue.
   
   I think it will simplify the implementation.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -561,7 +562,16 @@ private void addAbortTxnRequest(TxnID txnId, Consumer 
consumer, long lowWaterMar
                         if (cumulativeAckOfTransaction.getKey().equals(txnId)) 
{
                             cumulativeAckOfTransaction = null;
                         }
-                        
persistentSubscription.redeliverUnacknowledgedMessages(consumer);
+                        // pendingAck handle next pr will fix

Review comment:
       Can you add a //TODO here and add some description of what should we do 
here?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -95,9 +97,10 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+            ReadEntriesCallBackWrapper wrapper = 
ReadEntriesCallBackWrapper.create(consumer, DEFAULT_READ_EPOCH);

Review comment:
       Can you add a //TODO here? the compacted topic also needs this fix. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1456,18 +1456,42 @@ protected void handleFlow(CommandFlow flow) {
     @Override
     protected void 
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) 
{
         checkArgument(state == State.Connected);
+        CompletableFuture<Consumer> consumerFuture = 
consumers.get(redeliver.getConsumerId());
+        final boolean hasRequestId = redeliver.hasRequestId();
+        final long requestId = hasRequestId ? redeliver.getRequestId() : 0;
+        final long consumerId = redeliver.getConsumerId();
+
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Resend Command from consumer {} ", 
remoteAddress, redeliver.getConsumerId());
+            log.debug("[{}] redeliverUnacknowledged from consumer {} , 
requestId {}, consumerEpoch {}",
+                    remoteAddress, redeliver.getConsumerId(), requestId, 
redeliver.getConsumerEpoch());
         }
 
-        CompletableFuture<Consumer> consumerFuture = 
consumers.get(redeliver.getConsumerId());
-
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
             if (redeliver.getMessageIdsCount() > 0 && 
Subscription.isIndividualAckMode(consumer.subType())) {
                 
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
-                consumer.redeliverUnacknowledgedMessages();
+                
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch()).whenComplete((v,
 e) -> {
+                    if (hasRequestId) {
+                        if (e != null) {
+                            log.error("redeliverUnacknowledgedMessages error! "
+                                    + "consumerId : {}, requestId: {}", 
consumerId, requestId, e);
+                            ctx.writeAndFlush(Commands.newError(requestId,
+                                    
BrokerServiceException.getClientErrorCode(e), e.getMessage()));
+                        } else {
+                            ctx.writeAndFlush(Commands.newSuccess(requestId));

Review comment:
       Any reason for introducing the redelivery response here? If it is not 
directly related to the consumer epoch introduced in PIP 84, we can separate it 
into 2 parts.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
     public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,

Review comment:
       And the protocol handlers call this method, also can avoid modifications 
from the protocol handlers.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -269,7 +269,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> 
consumer) {
                 // thenAcceptAsync, there is no chance for recursion that 
would lead to stack overflow.
                 receiveMessageFromConsumer(consumer);
             }
-        }, internalPinnedExecutor).exceptionally(ex -> {
+        }, externalPinnedExecutor).exceptionally(ex -> {

Review comment:
       why change to the external pinned executor, the `externalPinnedExecutor` 
is used to trigger message listeners.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to