gemmellr commented on code in PR #5172: URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1732732990
########## artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java: ########## @@ -233,6 +233,9 @@ private synchronized void store(MapRecord<K, V> record) { // callers must be synchronized private void removed(MapRecord<K, V> record) { + if (logger.isTraceEnabled()) { + logger.info("Removing record {}", record, new Exception("trace")); Review Comment: Level mismatch between the gate and the actual logging. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java: ########## @@ -260,12 +268,12 @@ private void validateExpireSet(long queueID, JournalHashMap<AckRetry, AckRetry, if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) { if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) { if (logger.isDebugEnabled()) { - logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts()); + logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } retries.remove(retry); } else { if (logger.isDebugEnabled()) { - logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts()); + logger.trace("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); Review Comment: Also a (pre-existing) level mismatch between the gate and the actual log method. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java: ########## @@ -35,7 +36,7 @@ public void executeOnCompletion(final IOCallback runnable) { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel operationType) { Review Comment: operationType -> consistencyLevel ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -477,8 +480,11 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setAddress(internalAddress); } + // notice that MirrorTransaction is overriding getRequiredConsistency that is being set to ignore Replication. + // that means in case the target server is using replication, we will not wait for a roundtrip before the message is sent + // however we will wait the roundtrip before acking the message + // This is to alleviate a situation where messages would take too long to be delivered and be ready for ack Review Comment: MirrorTransaction itself would probably benefit from some Javadoc to make this more discoverable to anyone using it later. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java: ########## @@ -165,56 +185,74 @@ public synchronized void replicationDone() { @Override public void executeOnCompletion(IOCallback runnable) { - executeOnCompletion(runnable, false); + executeOnCompletion(runnable, OperationConsistencyLevel.FULL); } @Override - public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) { + public void executeOnCompletion(final IOCallback completion, final OperationConsistencyLevel consistencyLevel) { boolean executeNow = false; synchronized (this) { if (errorCode == -1) { final long storeLined = STORE_LINEUP_UPDATER.get(this); final long pageLined = PAGE_LINEUP_UPDATER.get(this); final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this); - if (storeOnly) { - if (storeOnlyTasks == null) { - storeOnlyTasks = new LinkedList<>(); - } - } else { - if (tasks == null) { - tasks = new LinkedList<>(); - minimalReplicated = replicationLined; - minimalStore = storeLined; - minimalPage = pageLined; - } - } - // On this case, we can just execute the context directly - - if (replicationLined == replicated && storeLined == stored && pageLined == paged) { - // We want to avoid the executor if everything is complete... - // However, we can't execute the context if there are executions pending - // We need to use the executor on this case - if (EXECUTORS_PENDING_UPDATER.get(this) == 0) { - // No need to use an executor here or a context switch - // there are no actions pending.. hence we can just execute the task directly on the same thread - executeNow = true; - } else { - execute(completion); - } - } else { - if (storeOnly) { - if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) { - executeNow = true; + switch (consistencyLevel) { + case STORAGE: + if (storeOnlyTasks == null) { + storeOnlyTasks = new LinkedList<>(); + } + if (storeLined == stored) { + if (hasNoPendingExecution()) { + executeNow = true; + } else { + execute(completion); + } } else { - assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); } - } else { - // ensure total ordering - assert validateTasksAdd(storeLined, replicationLined, pageLined); - tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); - } + break; + + case IGNORE_REPLICATION: + if (ignoreReplicationTasks == null) { + ignoreReplicationTasks = new LinkedList<>(); + } + + if (storeLined == stored && pageLined == paged) { + if (hasNoPendingExecution()) { + // No need to use an executor here or a context switch + // there are no actions pending.. hence we can just execute the task directly on the same thread + executeNow = true; + } else { + execute(completion); + } + } else { + ignoreReplicationTasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); Review Comment: The STORAGE case has its own task holder, to better capture its intent and to reduce its mem usage. Should this? ########## artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java: ########## @@ -179,22 +181,22 @@ public void done() { @Test public void testCompletionLateStoreOnly() throws Exception { - testCompletionLate(true); + testCompletionLate(OperationConsistencyLevel.STORAGE); } @Test public void testCompletionLate() throws Exception { - testCompletionLate(false); + testCompletionLate(OperationConsistencyLevel.FULL); } - private void testCompletionLate(boolean storeOnly) throws Exception { + private void testCompletionLate(OperationConsistencyLevel storeType) throws Exception { Review Comment: storeType -> consistencyLevel ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java: ########## @@ -2079,8 +2085,10 @@ public void executeOnCompletion(final IOCallback runnable) { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { - executeOnCompletion(runnable); + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel storeType) { Review Comment: storeType -> consistencyLevel ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java: ########## @@ -154,7 +155,7 @@ public void done() { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel operationType) { Review Comment: operationType -> consistencyLevel ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java: ########## @@ -262,46 +291,76 @@ private void checkStoreTasks() { final long stored = this.stored; for (int i = 0; i < size; i++) { final StoreOnlyTaskHolder holder = storeOnlyTasks.peek(); - if (stored < holder.storeLined) { - // fail fast: storeOnlyTasks are ordered by storeLined, there is no need to continue - return; + if (holder != null) { Review Comment: Was this actually needed? If the value is null, it could mean the list was empty which shouldnt happen given the size check above, or that we added a null which should be guarded against as all the adds are of new holders. Currently it would also still poll afterwards (below), which feels a bit weird as it seems like something is seriously wrong if this was ever null and throwing would be appropriate. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java: ########## @@ -165,56 +185,74 @@ public synchronized void replicationDone() { @Override public void executeOnCompletion(IOCallback runnable) { - executeOnCompletion(runnable, false); + executeOnCompletion(runnable, OperationConsistencyLevel.FULL); } @Override - public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) { + public void executeOnCompletion(final IOCallback completion, final OperationConsistencyLevel consistencyLevel) { boolean executeNow = false; synchronized (this) { if (errorCode == -1) { final long storeLined = STORE_LINEUP_UPDATER.get(this); final long pageLined = PAGE_LINEUP_UPDATER.get(this); final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this); - if (storeOnly) { - if (storeOnlyTasks == null) { - storeOnlyTasks = new LinkedList<>(); - } - } else { - if (tasks == null) { - tasks = new LinkedList<>(); - minimalReplicated = replicationLined; - minimalStore = storeLined; - minimalPage = pageLined; - } - } - // On this case, we can just execute the context directly - - if (replicationLined == replicated && storeLined == stored && pageLined == paged) { - // We want to avoid the executor if everything is complete... - // However, we can't execute the context if there are executions pending - // We need to use the executor on this case - if (EXECUTORS_PENDING_UPDATER.get(this) == 0) { - // No need to use an executor here or a context switch - // there are no actions pending.. hence we can just execute the task directly on the same thread - executeNow = true; - } else { - execute(completion); - } - } else { - if (storeOnly) { - if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) { - executeNow = true; + switch (consistencyLevel) { + case STORAGE: + if (storeOnlyTasks == null) { + storeOnlyTasks = new LinkedList<>(); + } + if (storeLined == stored) { + if (hasNoPendingExecution()) { + executeNow = true; + } else { + execute(completion); + } } else { - assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); } - } else { - // ensure total ordering - assert validateTasksAdd(storeLined, replicationLined, pageLined); - tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); - } + break; + + case IGNORE_REPLICATION: + if (ignoreReplicationTasks == null) { + ignoreReplicationTasks = new LinkedList<>(); + } + + if (storeLined == stored && pageLined == paged) { + if (hasNoPendingExecution()) { + // No need to use an executor here or a context switch + // there are no actions pending.. hence we can just execute the task directly on the same thread Review Comment: If the comment is needed, feels like it ought to be on at least the first example in the method, rather than the second and third. ########## tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java: ########## @@ -6636,7 +6637,7 @@ public void executeOnCompletion(IOCallback runnable) { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel storeOnly) { Review Comment: storeOnly -> consistencyLevel -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact