clebertsuconic commented on code in PR #4472:
URL: https://github.com/apache/activemq-artemis/pull/4472#discussion_r1194234114


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on 
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", 
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), 
routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : 
txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("lookup on {}, tx={}, preparedTX={}", 
msg.getTransactionID(), txInfo, preparedTX);
+                  }
+               }
+
                for (long queueID : routedQueues) {
                   boolean ok = !isACK(queueID, msg.getPageNumber(), 
msg.getMessageNumber());
 
-                  boolean txOK = msg.getTransactionID() <= 0 || transactions 
== null || transactions.contains(msg.getTransactionID());
+                  // if the pageTransaction is in prepare state, we have to 
increment the counter after the commit
+                  // notice that there is a check if the commit is done in 
afterCommit
+                  if (preparedTX != null) {
+                     PageSubscription subscription = 
pgStore.getCursorProvider().getSubscription(queueID);
+                     preparedTX.addOperation(new 
TransactionOperationAbstract() {
+                        @Override
+                        public void afterCommit(Transaction tx) {
+                           // We use the pagingManager executor here, in case 
the commit happened while the rebuild manager is working
+                           // on that case the increment will wait any pending 
tasks on that executor to finish before this executor takes effect
+                           pagingManager.execute(() -> {
+                              try {
+                                 subscription.getCounter().increment(null, 1, 
msg.getStoredSize());
+                              } catch (Exception e) {
+                                 logger.warn(e.getMessage(), e);
+                              }
+                           });
+                        }
+                     });
 
-                  if (!txOK) {
-                     logger.debug("TX is not ok for {}", msg);
-                  }
+                  } else {
+                     boolean txOK = msg.getTransactionID() <= 0 || 
transactions == null || txInfo != null;

Review Comment:
   I agree, although there was no actual change on that part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to