[ 
https://issues.apache.org/jira/browse/ARTEMIS-5001?focusedWorklogId=931967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931967
 ]

ASF GitHub Bot logged work on ARTEMIS-5001:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Aug/24 13:01
            Start Date: 27/Aug/24 13:01
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1732732990


##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -233,6 +233,9 @@ private synchronized void store(MapRecord<K, V> record) {
 
    // callers must be synchronized
    private void removed(MapRecord<K, V> record) {
+      if (logger.isTraceEnabled()) {
+         logger.info("Removing record {}", record, new Exception("trace"));

Review Comment:
   Level mismatch between the gate and the actual logging.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -260,12 +268,12 @@ private void validateExpireSet(long queueID, 
JournalHashMap<AckRetry, AckRetry,
          if (retry.getQueueAttempts() >= 
configuration.getMirrorAckManagerQueueAttempts()) {
             if (retry.attemptedPage() >= 
configuration.getMirrorAckManagerPageAttempts()) {
                if (logger.isDebugEnabled()) {
-                  logger.debug("Retried {} {} times, giving up on the entry 
now", retry, retry.getPageAttempts());
+                  logger.debug("Retried {} {} times, giving up on the entry 
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
                retries.remove(retry);
             } else {
                if (logger.isDebugEnabled()) {
-                  logger.trace("Retry {} attempted {} times on paging", retry, 
retry.getPageAttempts());
+                  logger.trace("Retry {} attempted {} times on paging, 
Configuration Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());

Review Comment:
   Also a (pre-existing) level mismatch between the gate and the actual log 
method.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java:
##########
@@ -35,7 +36,7 @@ public void executeOnCompletion(final IOCallback runnable) {
    }
 
    @Override
-   public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+   public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel operationType) {

Review Comment:
   operationType -> consistencyLevel



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -477,8 +480,11 @@ private boolean sendMessage(Message message, 
DeliveryAnnotations deliveryAnnotat
          message.setAddress(internalAddress);
       }
 
+      // notice that MirrorTransaction is overriding getRequiredConsistency 
that is being set to ignore Replication.
+      // that means in case the target server is using replication, we will 
not wait for a roundtrip before the message is sent
+      // however we will wait the roundtrip before acking the message
+      // This is to alleviate a situation where messages would take too long 
to be delivered and be ready for ack

Review Comment:
   MirrorTransaction itself would probably benefit from some Javadoc to make 
this more discoverable to anyone using it later.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
 
    @Override
    public void executeOnCompletion(IOCallback runnable) {
-      executeOnCompletion(runnable, false);
+      executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion, final boolean 
storeOnly) {
+   public void executeOnCompletion(final IOCallback completion, final 
OperationConsistencyLevel consistencyLevel) {
       boolean executeNow = false;
 
       synchronized (this) {
          if (errorCode == -1) {
             final long storeLined = STORE_LINEUP_UPDATER.get(this);
             final long pageLined = PAGE_LINEUP_UPDATER.get(this);
             final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
-            if (storeOnly) {
-               if (storeOnlyTasks == null) {
-                  storeOnlyTasks = new LinkedList<>();
-               }
-            } else {
-               if (tasks == null) {
-                  tasks = new LinkedList<>();
-                  minimalReplicated = replicationLined;
-                  minimalStore = storeLined;
-                  minimalPage = pageLined;
-               }
-            }
-            // On this case, we can just execute the context directly
-
-            if (replicationLined == replicated && storeLined == stored && 
pageLined == paged) {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are 
executions pending
-               // We need to use the executor on this case
-               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute 
the task directly on the same thread
-                  executeNow = true;
-               } else {
-                  execute(completion);
-               }
-            } else {
-               if (storeOnly) {
-                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                     executeNow = true;
+            switch (consistencyLevel) {
+               case STORAGE:
+                  if (storeOnlyTasks == null) {
+                     storeOnlyTasks = new LinkedList<>();
+                  }
+                  if (storeLined == stored) {
+                     if (hasNoPendingExecution()) {
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
                   } else {
-                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
                      storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
                   }
-               } else {
-                  // ensure total ordering
-                  assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
-                  tasks.add(new TaskHolder(completion, storeLined, 
replicationLined, pageLined));
-               }
+                  break;
+
+               case IGNORE_REPLICATION:
+                  if (ignoreReplicationTasks == null) {
+                     ignoreReplicationTasks = new LinkedList<>();
+                  }
+
+                  if (storeLined == stored && pageLined == paged) {
+                     if (hasNoPendingExecution()) {
+                        // No need to use an executor here or a context switch
+                        // there are no actions pending.. hence we can just 
execute the task directly on the same thread
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
+                  } else {
+                     ignoreReplicationTasks.add(new TaskHolder(completion, 
storeLined, replicationLined, pageLined));

Review Comment:
   The STORAGE case has its own task holder, to better capture its intent and 
to reduce its mem usage. Should this?



##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -179,22 +181,22 @@ public void done() {
 
    @Test
    public void testCompletionLateStoreOnly() throws Exception {
-      testCompletionLate(true);
+      testCompletionLate(OperationConsistencyLevel.STORAGE);
    }
 
    @Test
    public void testCompletionLate() throws Exception {
-      testCompletionLate(false);
+      testCompletionLate(OperationConsistencyLevel.FULL);
    }
 
-   private void testCompletionLate(boolean storeOnly) throws Exception {
+   private void testCompletionLate(OperationConsistencyLevel storeType) throws 
Exception {

Review Comment:
   storeType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -2079,8 +2085,10 @@ public void executeOnCompletion(final IOCallback 
runnable) {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
-         executeOnCompletion(runnable);
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel storeType) {

Review Comment:
   storeType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java:
##########
@@ -154,7 +155,7 @@ public void done() {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel operationType) {

Review Comment:
   operationType -> consistencyLevel



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -262,46 +291,76 @@ private void checkStoreTasks() {
       final long stored = this.stored;
       for (int i = 0; i < size; i++) {
          final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
-         if (stored < holder.storeLined) {
-            // fail fast: storeOnlyTasks are ordered by storeLined, there is 
no need to continue
-            return;
+         if (holder != null) {

Review Comment:
   Was this actually needed? If the value is null, it could mean the list was 
empty which shouldnt happen given the size check above, or that we added a null 
which should be guarded against as all the adds are of new holders.
   
   Currently it would also still poll afterwards (below), which feels a bit 
weird as it seems like something is seriously wrong if this was ever null and 
throwing would be appropriate.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
 
    @Override
    public void executeOnCompletion(IOCallback runnable) {
-      executeOnCompletion(runnable, false);
+      executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion, final boolean 
storeOnly) {
+   public void executeOnCompletion(final IOCallback completion, final 
OperationConsistencyLevel consistencyLevel) {
       boolean executeNow = false;
 
       synchronized (this) {
          if (errorCode == -1) {
             final long storeLined = STORE_LINEUP_UPDATER.get(this);
             final long pageLined = PAGE_LINEUP_UPDATER.get(this);
             final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
-            if (storeOnly) {
-               if (storeOnlyTasks == null) {
-                  storeOnlyTasks = new LinkedList<>();
-               }
-            } else {
-               if (tasks == null) {
-                  tasks = new LinkedList<>();
-                  minimalReplicated = replicationLined;
-                  minimalStore = storeLined;
-                  minimalPage = pageLined;
-               }
-            }
-            // On this case, we can just execute the context directly
-
-            if (replicationLined == replicated && storeLined == stored && 
pageLined == paged) {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are 
executions pending
-               // We need to use the executor on this case
-               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute 
the task directly on the same thread
-                  executeNow = true;
-               } else {
-                  execute(completion);
-               }
-            } else {
-               if (storeOnly) {
-                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                     executeNow = true;
+            switch (consistencyLevel) {
+               case STORAGE:
+                  if (storeOnlyTasks == null) {
+                     storeOnlyTasks = new LinkedList<>();
+                  }
+                  if (storeLined == stored) {
+                     if (hasNoPendingExecution()) {
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
                   } else {
-                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
                      storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
                   }
-               } else {
-                  // ensure total ordering
-                  assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
-                  tasks.add(new TaskHolder(completion, storeLined, 
replicationLined, pageLined));
-               }
+                  break;
+
+               case IGNORE_REPLICATION:
+                  if (ignoreReplicationTasks == null) {
+                     ignoreReplicationTasks = new LinkedList<>();
+                  }
+
+                  if (storeLined == stored && pageLined == paged) {
+                     if (hasNoPendingExecution()) {
+                        // No need to use an executor here or a context switch
+                        // there are no actions pending.. hence we can just 
execute the task directly on the same thread

Review Comment:
   If the comment is needed, feels like it ought to be on at least the first 
example in the method, rather than the second and third.



##########
tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java:
##########
@@ -6636,7 +6637,7 @@ public void executeOnCompletion(IOCallback runnable) {
       }
 
       @Override
-      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+      public void executeOnCompletion(IOCallback runnable, 
OperationConsistencyLevel storeOnly) {

Review Comment:
   storeOnly -> consistencyLevel





Issue Time Tracking
-------------------

    Worklog Id:     (was: 931967)
    Time Spent: 4h 50m  (was: 4h 40m)

> Add configuration option to relax syncs journal replication for Mirror Target
> -----------------------------------------------------------------------------
>
>                 Key: ARTEMIS-5001
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5001
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>    Affects Versions: 2.37.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.38.0
>
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> When I worked on AMQP Mirror I did not actually envision being used with 
> journal replication. I actually thought more about adding multiple mirrored 
> options instead.
> However an user reported me that when using mirror and journal replication 
> combined, the sends could take a lot longer to happen (some normal latency) 
> and the acks would eventually be missed.
> I should add an option to ignore the replication for the Mirror Target.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to