rvais commented on code in PR #4472:
URL: https://github.com/apache/activemq-artemis/pull/4472#discussion_r1194106917
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}",
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(),
routedQueues, msg, routedQueues);
}
+
+ PageTransactionInfo txInfo = null;
+
+ if (msg.getTransactionID() > 0) {
+ txInfo = transactions.get(msg.getTransactionID());
+ }
+
+ Transaction preparedTX = txInfo == null ? null :
txInfo.getPreparedTransaction();
+
+ if (logger.isTraceEnabled()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("lookup on {}, tx={}, preparedTX={}",
msg.getTransactionID(), txInfo, preparedTX);
+ }
+ }
+
for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(),
msg.getMessageNumber());
- boolean txOK = msg.getTransactionID() <= 0 || transactions
== null || transactions.contains(msg.getTransactionID());
+ // if the pageTransaction is in prepare state, we have to
increment the counter after the commit
+ // notice that there is a check if the commit is done in
afterCommit
+ if (preparedTX != null) {
+ PageSubscription subscription =
pgStore.getCursorProvider().getSubscription(queueID);
+ preparedTX.addOperation(new
TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ // We use the pagingManager executor here, in case
the commit happened while the rebuild manager is working
+ // on that case the increment will wait any pending
tasks on that executor to finish before this executor takes effect
Review Comment:
Typo. I believe there should be "in that case", but since it's in comment it
is not that important.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}",
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(),
routedQueues, msg, routedQueues);
}
+
+ PageTransactionInfo txInfo = null;
+
+ if (msg.getTransactionID() > 0) {
+ txInfo = transactions.get(msg.getTransactionID());
+ }
+
+ Transaction preparedTX = txInfo == null ? null :
txInfo.getPreparedTransaction();
+
+ if (logger.isTraceEnabled()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("lookup on {}, tx={}, preparedTX={}",
msg.getTransactionID(), txInfo, preparedTX);
+ }
+ }
+
for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(),
msg.getMessageNumber());
- boolean txOK = msg.getTransactionID() <= 0 || transactions
== null || transactions.contains(msg.getTransactionID());
+ // if the pageTransaction is in prepare state, we have to
increment the counter after the commit
+ // notice that there is a check if the commit is done in
afterCommit
+ if (preparedTX != null) {
+ PageSubscription subscription =
pgStore.getCursorProvider().getSubscription(queueID);
+ preparedTX.addOperation(new
TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ // We use the pagingManager executor here, in case
the commit happened while the rebuild manager is working
+ // on that case the increment will wait any pending
tasks on that executor to finish before this executor takes effect
+ pagingManager.execute(() -> {
+ try {
+ subscription.getCounter().increment(null, 1,
msg.getStoredSize());
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+ }
+ });
- if (!txOK) {
- logger.debug("TX is not ok for {}", msg);
- }
+ } else {
+ boolean txOK = msg.getTransactionID() <= 0 ||
transactions == null || txInfo != null;
Review Comment:
"txOK" is a bit confusing to me personally in given context.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -241,28 +247,64 @@ public void rebuild() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on
address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}",
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(),
routedQueues, msg, routedQueues);
}
+
+ PageTransactionInfo txInfo = null;
+
+ if (msg.getTransactionID() > 0) {
+ txInfo = transactions.get(msg.getTransactionID());
+ }
+
+ Transaction preparedTX = txInfo == null ? null :
txInfo.getPreparedTransaction();
+
+ if (logger.isTraceEnabled()) {
Review Comment:
There are two exactly the same conditions "logger.isTraceEnabled()" nested
together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]