[
https://issues.apache.org/jira/browse/ARTEMIS-5001?focusedWorklogId=931967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931967
]
ASF GitHub Bot logged work on ARTEMIS-5001:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Aug/24 13:01
Start Date: 27/Aug/24 13:01
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1732732990
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -233,6 +233,9 @@ private synchronized void store(MapRecord<K, V> record) {
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
+ if (logger.isTraceEnabled()) {
+ logger.info("Removing record {}", record, new Exception("trace"));
Review Comment:
Level mismatch between the gate and the actual logging.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -260,12 +268,12 @@ private void validateExpireSet(long queueID,
JournalHashMap<AckRetry, AckRetry,
if (retry.getQueueAttempts() >=
configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >=
configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
- logger.debug("Retried {} {} times, giving up on the entry
now", retry, retry.getPageAttempts());
+ logger.debug("Retried {} {} times, giving up on the entry
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
- logger.trace("Retry {} attempted {} times on paging", retry,
retry.getPageAttempts());
+ logger.trace("Retry {} attempted {} times on paging,
Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
Review Comment:
Also a (pre-existing) level mismatch between the gate and the actual log
method.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java:
##########
@@ -35,7 +36,7 @@ public void executeOnCompletion(final IOCallback runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel operationType) {
Review Comment:
operationType -> consistencyLevel
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -477,8 +480,11 @@ private boolean sendMessage(Message message,
DeliveryAnnotations deliveryAnnotat
message.setAddress(internalAddress);
}
+ // notice that MirrorTransaction is overriding getRequiredConsistency
that is being set to ignore Replication.
+ // that means in case the target server is using replication, we will
not wait for a roundtrip before the message is sent
+ // however we will wait the roundtrip before acking the message
+ // This is to alleviate a situation where messages would take too long
to be delivered and be ready for ack
Review Comment:
MirrorTransaction itself would probably benefit from some Javadoc to make
this more discoverable to anyone using it later.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
@Override
public void executeOnCompletion(IOCallback runnable) {
- executeOnCompletion(runnable, false);
+ executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
- public void executeOnCompletion(final IOCallback completion, final boolean
storeOnly) {
+ public void executeOnCompletion(final IOCallback completion, final
OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
if (errorCode == -1) {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
- if (storeOnly) {
- if (storeOnlyTasks == null) {
- storeOnlyTasks = new LinkedList<>();
- }
- } else {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLined;
- minimalStore = storeLined;
- minimalPage = pageLined;
- }
- }
- // On this case, we can just execute the context directly
-
- if (replicationLined == replicated && storeLined == stored &&
pageLined == paged) {
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are
executions pending
- // We need to use the executor on this case
- if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute
the task directly on the same thread
- executeNow = true;
- } else {
- execute(completion);
- }
- } else {
- if (storeOnly) {
- if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- executeNow = true;
+ switch (consistencyLevel) {
+ case STORAGE:
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ if (storeLined == stored) {
+ if (hasNoPendingExecution()) {
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
} else {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
}
- } else {
- // ensure total ordering
- assert validateTasksAdd(storeLined, replicationLined,
pageLined);
- tasks.add(new TaskHolder(completion, storeLined,
replicationLined, pageLined));
- }
+ break;
+
+ case IGNORE_REPLICATION:
+ if (ignoreReplicationTasks == null) {
+ ignoreReplicationTasks = new LinkedList<>();
+ }
+
+ if (storeLined == stored && pageLined == paged) {
+ if (hasNoPendingExecution()) {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just
execute the task directly on the same thread
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
+ } else {
+ ignoreReplicationTasks.add(new TaskHolder(completion,
storeLined, replicationLined, pageLined));
Review Comment:
The STORAGE case has its own task holder, to better capture its intent and
to reduce its mem usage. Should this?
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -179,22 +181,22 @@ public void done() {
@Test
public void testCompletionLateStoreOnly() throws Exception {
- testCompletionLate(true);
+ testCompletionLate(OperationConsistencyLevel.STORAGE);
}
@Test
public void testCompletionLate() throws Exception {
- testCompletionLate(false);
+ testCompletionLate(OperationConsistencyLevel.FULL);
}
- private void testCompletionLate(boolean storeOnly) throws Exception {
+ private void testCompletionLate(OperationConsistencyLevel storeType) throws
Exception {
Review Comment:
storeType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -2079,8 +2085,10 @@ public void executeOnCompletion(final IOCallback
runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
- executeOnCompletion(runnable);
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel storeType) {
Review Comment:
storeType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java:
##########
@@ -154,7 +155,7 @@ public void done() {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel operationType) {
Review Comment:
operationType -> consistencyLevel
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -262,46 +291,76 @@ private void checkStoreTasks() {
final long stored = this.stored;
for (int i = 0; i < size; i++) {
final StoreOnlyTaskHolder holder = storeOnlyTasks.peek();
- if (stored < holder.storeLined) {
- // fail fast: storeOnlyTasks are ordered by storeLined, there is
no need to continue
- return;
+ if (holder != null) {
Review Comment:
Was this actually needed? If the value is null, it could mean the list was
empty which shouldnt happen given the size check above, or that we added a null
which should be guarded against as all the adds are of new holders.
Currently it would also still poll afterwards (below), which feels a bit
weird as it seems like something is seriously wrong if this was ever null and
throwing would be appropriate.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
@Override
public void executeOnCompletion(IOCallback runnable) {
- executeOnCompletion(runnable, false);
+ executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
- public void executeOnCompletion(final IOCallback completion, final boolean
storeOnly) {
+ public void executeOnCompletion(final IOCallback completion, final
OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
if (errorCode == -1) {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
- if (storeOnly) {
- if (storeOnlyTasks == null) {
- storeOnlyTasks = new LinkedList<>();
- }
- } else {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLined;
- minimalStore = storeLined;
- minimalPage = pageLined;
- }
- }
- // On this case, we can just execute the context directly
-
- if (replicationLined == replicated && storeLined == stored &&
pageLined == paged) {
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are
executions pending
- // We need to use the executor on this case
- if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute
the task directly on the same thread
- executeNow = true;
- } else {
- execute(completion);
- }
- } else {
- if (storeOnly) {
- if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- executeNow = true;
+ switch (consistencyLevel) {
+ case STORAGE:
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ if (storeLined == stored) {
+ if (hasNoPendingExecution()) {
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
} else {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
}
- } else {
- // ensure total ordering
- assert validateTasksAdd(storeLined, replicationLined,
pageLined);
- tasks.add(new TaskHolder(completion, storeLined,
replicationLined, pageLined));
- }
+ break;
+
+ case IGNORE_REPLICATION:
+ if (ignoreReplicationTasks == null) {
+ ignoreReplicationTasks = new LinkedList<>();
+ }
+
+ if (storeLined == stored && pageLined == paged) {
+ if (hasNoPendingExecution()) {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just
execute the task directly on the same thread
Review Comment:
If the comment is needed, feels like it ought to be on at least the first
example in the method, rather than the second and third.
##########
tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java:
##########
@@ -6636,7 +6637,7 @@ public void executeOnCompletion(IOCallback runnable) {
}
@Override
- public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+ public void executeOnCompletion(IOCallback runnable,
OperationConsistencyLevel storeOnly) {
Review Comment:
storeOnly -> consistencyLevel
Issue Time Tracking
-------------------
Worklog Id: (was: 931967)
Time Spent: 4h 50m (was: 4h 40m)
> Add configuration option to relax syncs journal replication for Mirror Target
> -----------------------------------------------------------------------------
>
> Key: ARTEMIS-5001
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5001
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Affects Versions: 2.37.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.38.0
>
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> When I worked on AMQP Mirror I did not actually envision being used with
> journal replication. I actually thought more about adding multiple mirrored
> options instead.
> However an user reported me that when using mirror and journal replication
> combined, the sends could take a lot longer to happen (some normal latency)
> and the acks would eventually be missed.
> I should add an option to ignore the replication for the Mirror Target.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact