rvais commented on code in PR #4472:
URL: https://github.com/apache/activemq-artemis/pull/4472#discussion_r1194106917


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on 
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", 
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), 
routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : 
txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("lookup on {}, tx={}, preparedTX={}", 
msg.getTransactionID(), txInfo, preparedTX);
+                  }
+               }
+
                for (long queueID : routedQueues) {
                   boolean ok = !isACK(queueID, msg.getPageNumber(), 
msg.getMessageNumber());
 
-                  boolean txOK = msg.getTransactionID() <= 0 || transactions 
== null || transactions.contains(msg.getTransactionID());
+                  // if the pageTransaction is in prepare state, we have to 
increment the counter after the commit
+                  // notice that there is a check if the commit is done in 
afterCommit
+                  if (preparedTX != null) {
+                     PageSubscription subscription = 
pgStore.getCursorProvider().getSubscription(queueID);
+                     preparedTX.addOperation(new 
TransactionOperationAbstract() {
+                        @Override
+                        public void afterCommit(Transaction tx) {
+                           // We use the pagingManager executor here, in case 
the commit happened while the rebuild manager is working
+                           // on that case the increment will wait any pending 
tasks on that executor to finish before this executor takes effect

Review Comment:
   Typo. I believe there should be "in that case", but since it's in comment it 
is not that important.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on 
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", 
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), 
routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : 
txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("lookup on {}, tx={}, preparedTX={}", 
msg.getTransactionID(), txInfo, preparedTX);
+                  }
+               }
+
                for (long queueID : routedQueues) {
                   boolean ok = !isACK(queueID, msg.getPageNumber(), 
msg.getMessageNumber());
 
-                  boolean txOK = msg.getTransactionID() <= 0 || transactions 
== null || transactions.contains(msg.getTransactionID());
+                  // if the pageTransaction is in prepare state, we have to 
increment the counter after the commit
+                  // notice that there is a check if the commit is done in 
afterCommit
+                  if (preparedTX != null) {
+                     PageSubscription subscription = 
pgStore.getCursorProvider().getSubscription(queueID);
+                     preparedTX.addOperation(new 
TransactionOperationAbstract() {
+                        @Override
+                        public void afterCommit(Transaction tx) {
+                           // We use the pagingManager executor here, in case 
the commit happened while the rebuild manager is working
+                           // on that case the increment will wait any pending 
tasks on that executor to finish before this executor takes effect
+                           pagingManager.execute(() -> {
+                              try {
+                                 subscription.getCounter().increment(null, 1, 
msg.getStoredSize());
+                              } catch (Exception e) {
+                                 logger.warn(e.getMessage(), e);
+                              }
+                           });
+                        }
+                     });
 
-                  if (!txOK) {
-                     logger.debug("TX is not ok for {}", msg);
-                  }
+                  } else {
+                     boolean txOK = msg.getTransactionID() <= 0 || 
transactions == null || txInfo != null;

Review Comment:
   "txOK" is a bit confusing to me personally in given context.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
                if (logger.isTraceEnabled()) {
                   logger.trace("reading message for rebuild cursor on 
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", 
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), 
routedQueues, msg, routedQueues);
                }
+
+               PageTransactionInfo txInfo = null;
+
+               if (msg.getTransactionID() > 0) {
+                  txInfo = transactions.get(msg.getTransactionID());
+               }
+
+               Transaction preparedTX = txInfo == null ? null : 
txInfo.getPreparedTransaction();
+
+               if (logger.isTraceEnabled()) {

Review Comment:
   There are two exactly the same conditions "logger.isTraceEnabled()" nested 
together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to