gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1732732990


##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -233,6 +233,9 @@ private synchronized void store(MapRecord<K, V> record) {
 
    // callers must be synchronized
    private void removed(MapRecord<K, V> record) {
+      if (logger.isTraceEnabled()) {
+         logger.info("Removing record {}", record, new Exception("trace"));

Review Comment:
   Level mismatch between the gate and the actual logging.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -260,12 +268,12 @@ private void validateExpireSet(long queueID, 
JournalHashMap<AckRetry, AckRetry,
          if (retry.getQueueAttempts() >= 
configuration.getMirrorAckManagerQueueAttempts()) {
             if (retry.attemptedPage() >= 
configuration.getMirrorAckManagerPageAttempts()) {
                if (logger.isDebugEnabled()) {
-                  logger.debug("Retried {} {} times, giving up on the entry 
now", retry, retry.getPageAttempts());
+                  logger.debug("Retried {} {} times, giving up on the entry 
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
                retries.remove(retry);
             } else {
                if (logger.isDebugEnabled()) {
-                  logger.trace("Retry {} attempted {} times on paging", retry, 
retry.getPageAttempts());
+                  logger.trace("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());

Review Comment:
   Also a (pre-existing) level mismatch between the gate and the actual log 
method.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java:
##########
@@ -35,7 +36,7 @@ public void executeOnCompletion(final IOCallback runnable) {
    }
 
    @Override
-   public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+   public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel operationType) {

Review Comment:
   operationType -> consistencyLevel



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -477,8 +480,11 @@ private boolean sendMessage(Message message, 
DeliveryAnnotations deliveryAnnotat
          message.setAddress(internalAddress);
       }
 
+      // notice that MirrorTransaction is overriding getRequiredConsistency 
that is being set to ignore Replication.
+      // that means in case the target server is using replication, we will 
not wait for a roundtrip before the message is sent
+      // however we will wait the roundtrip before acking the message
+      // This is to alleviate a situation where messages would take too long 
to be delivered and be ready for ack

Review Comment:
   MirrorTransaction itself would probably benefit from some Javadoc to make 
this more discoverable to anyone using it later.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
 
    @Override
    public void executeOnCompletion(IOCallback runnable) {
-      executeOnCompletion(runnable, false);
+      executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion, final boolean 
storeOnly) {
+   public void executeOnCompletion(final IOCallback completion, final 
OperationConsistencyLevel consistencyLevel) {
       boolean executeNow = false;
 
       synchronized (this) {
          if (errorCode == -1) {
             final long storeLined = STORE_LINEUP_UPDATER.get(this);
             final long pageLined = PAGE_LINEUP_UPDATER.get(this);
             final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
-            if (storeOnly) {
-               if (storeOnlyTasks == null) {
-                  storeOnlyTasks = new LinkedList<>();
-               }
-            } else {
-               if (tasks == null) {
-                  tasks = new LinkedList<>();
-                  minimalReplicated = replicationLined;
-                  minimalStore = storeLined;
-                  minimalPage = pageLined;
-               }
-            }
-            // On this case, we can just execute the context directly
-
-            if (replicationLined == replicated && storeLined == stored && 
pageLined == paged) {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are 
executions pending
-               // We need to use the executor on this case
-               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute 
the task directly on the same thread
-                  executeNow = true;
-               } else {
-                  execute(completion);
-               }
-            } else {
-               if (storeOnly) {
-                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                     executeNow = true;
+            switch (consistencyLevel) {
+               case STORAGE:
+                  if (storeOnlyTasks == null) {
+                     storeOnlyTasks = new LinkedList<>();
+                  }
+                  if (storeLined == stored) {
+                     if (hasNoPendingExecution()) {
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
                   } else {
-                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
                      storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
                   }
-               } else {
-                  // ensure total ordering
-                  assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
-                  tasks.add(new TaskHolder(completion, storeLined, 
replicationLined, pageLined));
-               }
+                  break;
+
+               case IGNORE_REPLICATION:
+                  if (ignoreReplicationTasks == null) {
+                     ignoreReplicationTasks = new LinkedList<>();
+                  }
+
+                  if (storeLined == stored && pageLined == paged) {
+                     if (hasNoPendingExecution()) {
+                        // No need to use an executor here or a context switch
+                        // there are no actions pending.. hence we can just 
execute the task directly on the same thread
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
+                  } else {
+                     ignoreReplicationTasks.add(new TaskHolder(completion, 
storeLined, replicationLined, pageLined));

Review Comment:
   The STORAGE case has its own task holder, to better capture its intent and 
to reduce its mem usage. Should this?



##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -179,22 +181,22 @@ public void done() {
 
    @Test
    public void testCompletionLateStoreOnly() throws Exception {
-      testCompletionLate(true);
+      testCompletionLate(OperationConsistencyLevel.STORAGE);
    }
 
    @Test
    public void testCompletionLate() throws Exception {
-      testCompletionLate(false);
+      testCompletionLate(OperationConsistencyLevel.FULL);
    }
 
-   private void testCompletionLate(boolean storeOnly) throws Exception {
+   private void testCompletionLate(OperationConsistencyLevel storeType) throws 
Exception {

Review Comment:
   storeType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -2079,8 +2085,10 @@ public void executeOnCompletion(final IOCallback 
runnable) {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
-         executeOnCompletion(runnable);
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel storeType) {

Review Comment:
   storeType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java:
##########
@@ -154,7 +155,7 @@ public void done() {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel operationType) {

Review Comment:
   operationType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -262,46 +291,76 @@ private void checkStoreTasks() {
       final long stored = this.stored;
       for (int i = 0; i < size; i++) {
          final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
-         if (stored < holder.storeLined) {
-            // fail fast: storeOnlyTasks are ordered by storeLined, there is 
no need to continue
-            return;
+         if (holder != null) {

Review Comment:
   Was this actually needed? If the value is null, it could mean the list was 
empty which shouldnt happen given the size check above, or that we added a null 
which should be guarded against as all the adds are of new holders.
   
   Currently it would also still poll afterwards (below), which feels a bit 
weird as it seems like something is seriously wrong if this was ever null and 
throwing would be appropriate.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
 
    @Override
    public void executeOnCompletion(IOCallback runnable) {
-      executeOnCompletion(runnable, false);
+      executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion, final boolean 
storeOnly) {
+   public void executeOnCompletion(final IOCallback completion, final 
OperationConsistencyLevel consistencyLevel) {
       boolean executeNow = false;
 
       synchronized (this) {
          if (errorCode == -1) {
             final long storeLined = STORE_LINEUP_UPDATER.get(this);
             final long pageLined = PAGE_LINEUP_UPDATER.get(this);
             final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
-            if (storeOnly) {
-               if (storeOnlyTasks == null) {
-                  storeOnlyTasks = new LinkedList<>();
-               }
-            } else {
-               if (tasks == null) {
-                  tasks = new LinkedList<>();
-                  minimalReplicated = replicationLined;
-                  minimalStore = storeLined;
-                  minimalPage = pageLined;
-               }
-            }
-            // On this case, we can just execute the context directly
-
-            if (replicationLined == replicated && storeLined == stored && 
pageLined == paged) {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are 
executions pending
-               // We need to use the executor on this case
-               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute 
the task directly on the same thread
-                  executeNow = true;
-               } else {
-                  execute(completion);
-               }
-            } else {
-               if (storeOnly) {
-                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                     executeNow = true;
+            switch (consistencyLevel) {
+               case STORAGE:
+                  if (storeOnlyTasks == null) {
+                     storeOnlyTasks = new LinkedList<>();
+                  }
+                  if (storeLined == stored) {
+                     if (hasNoPendingExecution()) {
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
                   } else {
-                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
                      storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
                   }
-               } else {
-                  // ensure total ordering
-                  assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
-                  tasks.add(new TaskHolder(completion, storeLined, 
replicationLined, pageLined));
-               }
+                  break;
+
+               case IGNORE_REPLICATION:
+                  if (ignoreReplicationTasks == null) {
+                     ignoreReplicationTasks = new LinkedList<>();
+                  }
+
+                  if (storeLined == stored && pageLined == paged) {
+                     if (hasNoPendingExecution()) {
+                        // No need to use an executor here or a context switch
+                        // there are no actions pending.. hence we can just 
execute the task directly on the same thread

Review Comment:
   If the comment is needed, feels like it ought to be on at least the first 
example in the method, rather than the second and third.



##########
tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java:
##########
@@ -6636,7 +6637,7 @@ public void executeOnCompletion(IOCallback runnable) {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel storeOnly) {

Review Comment:
   storeOnly -> consistencyLevel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to