[
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906431&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906431
]
ASF GitHub Bot logged work on ARTEMIS-4651:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Feb/24 12:50
Start Date: 22/Feb/24 12:50
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499051782
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister<K, V> implements
Persister<JournalHashMap.MapRecord<K, V>> {
+
+ @Override
+ public byte getID() {
+ return 0;
+ }
+
+ @Override
+ public final int getEncodeSize(JournalHashMap.MapRecord<K, V> record) {
+ return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning
in the future
+ DataConstants.SIZE_LONG + // recordID
+ DataConstants.SIZE_LONG + // collectionID
+ getKeySize(record.key) +
+ getValueSize(record.value);
+ }
+
+ protected abstract int getKeySize(K key);
+
+ protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+ protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+ protected abstract int getValueSize(V value);
+
+ protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+ protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+ @Override
+ public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord<K,
V> record) {
+ buffer.writeByte((byte)0); // filler - could be used for versioning in
the future.
+ buffer.writeLong(record.id);
+ buffer.writeLong(record.collectionID);
+ encodeKey(buffer, record.key);
+ encodeValue(buffer, record.value);
+ }
+
+ @Override
+ public final JournalHashMap.MapRecord<K, V> decode(ActiveMQBuffer buffer,
+ JournalHashMap.MapRecord<K, V>
record,
+ CoreMessageObjectPools pool) {
+ buffer.readByte(); // filler - not used currently - just in case we ever
need to version this wiring
+ long id = buffer.readLong();
+ long collectionID = buffer.readLong();
+ K key = decodeKey(buffer);
+ V value = decodeValue(buffer, key);
+
+ JournalHashMap.MapRecord<K, V> mapRecord = new
JournalHashMap.MapRecord<>(collectionID, id, key, value);
Review Comment:
Is there a reason the persister favours the 'id' as the first arg, but the
object representation favours the 'collectionId' as the first?
Would be nice if they were consistent unless there is a reason they shouldnt
be?
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister<K, V> implements
Persister<JournalHashMap.MapRecord<K, V>> {
+
+ @Override
+ public byte getID() {
+ return 0;
+ }
+
+ @Override
+ public final int getEncodeSize(JournalHashMap.MapRecord<K, V> record) {
+ return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning
in the future
+ DataConstants.SIZE_LONG + // recordID
+ DataConstants.SIZE_LONG + // collectionID
+ getKeySize(record.key) +
+ getValueSize(record.value);
+ }
+
+ protected abstract int getKeySize(K key);
+
+ protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+ protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+ protected abstract int getValueSize(V value);
+
+ protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+ protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+ @Override
+ public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord<K,
V> record) {
+ buffer.writeByte((byte)0); // filler - could be used for versioning in
the future.
+ buffer.writeLong(record.id);
+ buffer.writeLong(record.collectionID);
+ encodeKey(buffer, record.key);
+ encodeValue(buffer, record.value);
+ }
+
+ @Override
+ public final JournalHashMap.MapRecord<K, V> decode(ActiveMQBuffer buffer,
+ JournalHashMap.MapRecord<K, V>
record,
+ CoreMessageObjectPools pool) {
+ buffer.readByte(); // filler - not used currently - just in case we ever
need to version this wiring
Review Comment:
Could we just say its the version now, and make the code + comments (perhaps
then deletable..) clearer for later?
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+ store(key, record);
+ MapRecord<K, V> oldRecord = map.put(key, record);
+
+ if (oldRecord != null) {
+ removed(oldRecord);
+ return oldRecord.value;
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void store(K key, MapRecord<K, V> record) {
+ try {
+ IOCompletion callback = null;
+ if (completionSupplier != null) {
+ callback = completionSupplier.get();
+ }
+
+ if (callback == null) {
+ journal.appendAddRecord(record.id, recordType, persister, record,
false);
+ } else {
+ journal.appendAddRecord(record.id, recordType, persister, record,
true, callback);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord) {
Review Comment:
Typo, record. Also, add type info.
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+ store(key, record);
+ MapRecord<K, V> oldRecord = map.put(key, record);
+
+ if (oldRecord != null) {
+ removed(oldRecord);
+ return oldRecord.value;
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void store(K key, MapRecord<K, V> record) {
+ try {
+ IOCompletion callback = null;
+ if (completionSupplier != null) {
+ callback = completionSupplier.get();
+ }
+
+ if (callback == null) {
+ journal.appendAddRecord(record.id, recordType, persister, record,
false);
+ } else {
+ journal.appendAddRecord(record.id, recordType, persister, record,
true, callback);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord) {
+ try {
+ journal.appendDeleteRecord(reccord.id, false);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord, long txid) {
+ try {
+ journal.appendDeleteRecordTransactional(txid, reccord.id);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ @Override
+ public synchronized V remove(Object key) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record);
+ return record.value;
+ }
+
+ /** This method will remove the element from the HashMap immediately
however the record is still part of a transaction.
+ * This is not playing with rollbacks. So a rollback on the transaction
wouldn't place the elements back.
+ * This is intended to make sure the operation would be atomic in case of
a failure, while an appendRollback is not expected. */
+ public synchronized V remove(Object key, long transactionID) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record, transactionID);
+ return record.value;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> m) {
+ m.forEach(this::put);
+ }
+
+ @Override
+ public synchronized void clear() {
+ map.values().forEach(v -> remove(v));
+ map.clear();
+ }
+
+ @Override
+ public synchronized Set<K> keySet() {
+ HashSet<K> keys = new HashSet(map.size());
+ map.values().forEach(v -> keys.add(v.key));
+ return keys;
+ }
Review Comment:
Pretty sure this violates the Map contract for keySet() that the Set is
backed by the map contents.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java:
##########
@@ -43,6 +43,8 @@ public class AmqpSupport {
public static final int AMQP_CREDITS_DEFAULT = 1000;
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
+ public static final int AMQP_MIRROR_ACK_RETRY_INTERVAL = 10_000;
+
Review Comment:
Is this used any more?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+ // we first retry on the queue a few tiems
+ public static final short MAX_QUEUE_ATTEMPT =
Short.parseShort(System.getProperty(AckRetry.class.getName() +
".MAX_QUEUE_ATTEMPT", "5"));
Review Comment:
tiems -> times
I'd probably change/remove 'a few times' given its actually 5
Do we retry on the queue after trying paging?
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
Review Comment:
Why is only the first one final, when apparently only the last one can be
changed?
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+ store(key, record);
+ MapRecord<K, V> oldRecord = map.put(key, record);
+
+ if (oldRecord != null) {
+ removed(oldRecord);
+ return oldRecord.value;
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void store(K key, MapRecord<K, V> record) {
+ try {
+ IOCompletion callback = null;
+ if (completionSupplier != null) {
+ callback = completionSupplier.get();
+ }
+
+ if (callback == null) {
+ journal.appendAddRecord(record.id, recordType, persister, record,
false);
+ } else {
+ journal.appendAddRecord(record.id, recordType, persister, record,
true, callback);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord) {
+ try {
+ journal.appendDeleteRecord(reccord.id, false);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord, long txid) {
Review Comment:
Same
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+ store(key, record);
+ MapRecord<K, V> oldRecord = map.put(key, record);
+
+ if (oldRecord != null) {
+ removed(oldRecord);
+ return oldRecord.value;
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void store(K key, MapRecord<K, V> record) {
+ try {
+ IOCompletion callback = null;
+ if (completionSupplier != null) {
+ callback = completionSupplier.get();
+ }
+
+ if (callback == null) {
+ journal.appendAddRecord(record.id, recordType, persister, record,
false);
+ } else {
+ journal.appendAddRecord(record.id, recordType, persister, record,
true, callback);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord) {
+ try {
+ journal.appendDeleteRecord(reccord.id, false);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord, long txid) {
+ try {
+ journal.appendDeleteRecordTransactional(txid, reccord.id);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ @Override
+ public synchronized V remove(Object key) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record);
+ return record.value;
+ }
+
+ /** This method will remove the element from the HashMap immediately
however the record is still part of a transaction.
+ * This is not playing with rollbacks. So a rollback on the transaction
wouldn't place the elements back.
+ * This is intended to make sure the operation would be atomic in case of
a failure, while an appendRollback is not expected. */
+ public synchronized V remove(Object key, long transactionID) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record, transactionID);
+ return record.value;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> m) {
+ m.forEach(this::put);
+ }
+
+ @Override
+ public synchronized void clear() {
+ map.values().forEach(v -> remove(v));
+ map.clear();
+ }
+
+ @Override
+ public synchronized Set<K> keySet() {
+ HashSet<K> keys = new HashSet(map.size());
+ map.values().forEach(v -> keys.add(v.key));
+ return keys;
+ }
+
+ @Override
+ public synchronized Collection<V> values() {
+ ArrayList values = new ArrayList(map.size());
+ map.values().forEach(v -> values.add(v.value));
+ return values;
+ }
Review Comment:
Similarly violates the Map contract
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+ store(key, record);
+ MapRecord<K, V> oldRecord = map.put(key, record);
+
+ if (oldRecord != null) {
+ removed(oldRecord);
+ return oldRecord.value;
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void store(K key, MapRecord<K, V> record) {
+ try {
+ IOCompletion callback = null;
+ if (completionSupplier != null) {
+ callback = completionSupplier.get();
+ }
+
+ if (callback == null) {
+ journal.appendAddRecord(record.id, recordType, persister, record,
false);
+ } else {
+ journal.appendAddRecord(record.id, recordType, persister, record,
true, callback);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord) {
+ try {
+ journal.appendDeleteRecord(reccord.id, false);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ // callers must be synchronized
+ private void removed(MapRecord reccord, long txid) {
+ try {
+ journal.appendDeleteRecordTransactional(txid, reccord.id);
+ } catch (Exception e) {
+ exceptionListener.onIOException(e, e.getMessage(), null);
+ }
+ }
+
+ @Override
+ public synchronized V remove(Object key) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record);
+ return record.value;
+ }
+
+ /** This method will remove the element from the HashMap immediately
however the record is still part of a transaction.
+ * This is not playing with rollbacks. So a rollback on the transaction
wouldn't place the elements back.
+ * This is intended to make sure the operation would be atomic in case of
a failure, while an appendRollback is not expected. */
+ public synchronized V remove(Object key, long transactionID) {
+ MapRecord<K, V> record = map.remove(key);
+ this.removed(record, transactionID);
+ return record.value;
+ }
+
+ @Override
+ public synchronized void putAll(Map<? extends K, ? extends V> m) {
+ m.forEach(this::put);
+ }
+
+ @Override
+ public synchronized void clear() {
+ map.values().forEach(v -> remove(v));
+ map.clear();
+ }
+
+ @Override
+ public synchronized Set<K> keySet() {
+ HashSet<K> keys = new HashSet(map.size());
+ map.values().forEach(v -> keys.add(v.key));
+ return keys;
+ }
+
+ @Override
+ public synchronized Collection<V> values() {
+ ArrayList values = new ArrayList(map.size());
+ map.values().forEach(v -> values.add(v.value));
+ return values;
+ }
+
+ @Override
+ public synchronized Set<Entry<K, V>> entrySet() {
+ HashSet<Entry<K, V>> values = new HashSet<>();
+ map.values().forEach(values::add);
+ return values;
+ }
Review Comment:
Ditto
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java:
##########
@@ -288,6 +298,15 @@ public synchronized void stop() {
}
+ @Override
+ public void run() {
+ if (scheduledTask == null) {
+ logger.warn("Task not defined on {}", this.getClass());
+ } else {
+ scheduledTask.run();
+ }
+ }
+
Review Comment:
I think it would probably be better to leave this class abstract and
implement run in the new Component, as every other use of the class did and
still does. This class already has too many different ways to use it from what
I recall of previous looks. This basically just adds another.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -424,6 +453,10 @@ public void preAcknowledge(final Transaction tx, final
MessageReference ref, fin
MirrorController controllerInUse = getControllerInUse();
+ if (controllerInUse != null && !controllerInUse.isAllowACK()) {
Review Comment:
This comment relates to code just above this change, not the change it is
against.
The trace log at the start of the method looks wrong, references
postACKInternalMessage when thats a seperate method further up, this is
actually preAcknowledge
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
+ final long collectionID;
+ long id;
+ K key;
+ V value;
+
+ MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id +
", key=" + key + ", value=" + value + '}';
+ }
+ }
+
+ public JournalHashMap(long collectionId, Journal journal, LongSupplier
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType,
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider,
IOCriticalErrorListener ioExceptionListener) {
+ this.collectionId = collectionId;
+ this.journal = journal;
+ this.idGenerator = idGenerator;
+ this.persister = persister;
+ this.recordType = recordType;
+ this.exceptionListener = ioExceptionListener;
+ this.completionSupplier = completionSupplier;
+ this.contextProvider = contextProvider;
+ }
+
+ C context;
+
+ LongFunction<C> contextProvider;
+
+ private final Persister<MapRecord<K, V>> persister;
+
+ private final Journal journal;
+
+ private final long collectionId;
+
+ private final byte recordType;
+
+ private final LongSupplier idGenerator;
+
+ private final Supplier<IOCompletion> completionSupplier;
+
+ private final IOCriticalErrorListener exceptionListener;
+
+ private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+ public long getCollectionId() {
+ return collectionId;
+ }
+
+ @Override
+ public synchronized int size() {
+ return map.size();
+ }
+
+ public C getContext() {
+ if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+ }
+ return context;
+ }
+
+ public JournalHashMap<K, V, C> setContext(C context) {
+ this.context = context;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public synchronized boolean containsValue(Object value) {
+ for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+ if (value.equals(entry.getValue().value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized V get(Object key) {
+ MapRecord<K, V> reccord = map.get(key);
+ if (reccord == null) {
+ return null;
+ } else {
+ return reccord.value;
+ }
+ }
+
+ /** This is to be called from a single thread during reload, no need to be
synchronized */
+ public void reload(MapRecord<K, V> reloadValue) {
+ map.put(reloadValue.getKey(), reloadValue);
+ }
+
+ @Override
+ public synchronized V put(K key, V value) {
+ logger.debug("adding {} = {}", key, value);
+ long id = idGenerator.getAsLong();
+ MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
Review Comment:
Add diamond <> (or actual full type<K,V>) to suppress warnings. Here and
lots of other places, it looks like.
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class MapRecord<K, V> implements Entry<K, V> {
Review Comment:
This Entry should be implementing equals and hashCode, especially given it
is actually being inserted into a HashMap
##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMapProvider.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+public class JournalHashMapProvider<K, V, C> {
+
+ final Journal journal;
+ final Persister<JournalHashMap.MapRecord<K, V>> persister;
+ final LongObjectHashMap<JournalHashMap<K, V, C>> journalMaps = new
LongObjectHashMap<>();
+ final LongSupplier idSupplier;
+ final byte recordType;
+ final IOCriticalErrorListener ioExceptionListener;
+ final Supplier<IOCompletion> ioCompletionSupplier;
+ final LongFunction<C> contextProvider;
+
+ public JournalHashMapProvider(LongSupplier idSupplier, Journal journal,
AbstractHashMapPersister persister, byte recordType, Supplier<IOCompletion>
ioCompletionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener
ioExceptionListener) {
+ this.idSupplier = idSupplier;
+ this.persister = persister;
+ this.journal = journal;
+ this.recordType = recordType;
+ this.ioExceptionListener = ioExceptionListener;
+ this.contextProvider = contextProvider;
+ this.ioCompletionSupplier = ioCompletionSupplier;
+ }
+
+ public List<JournalHashMap<K, V, C>> getMaps() {
+ ArrayList<JournalHashMap<K, V, C>> maps = new ArrayList<>();
+ journalMaps.values().forEach(maps::add);
+ return maps;
+ }
+
+ public void clear() {
+ journalMaps.clear();
+ }
+
+ public void reload(RecordInfo recordInfo) {
+ JournalHashMap.MapRecord<K, V> mapRecord =
persister.decode(recordInfo.wrapData(), null, null);
+ getMap(mapRecord.collectionID, null).reload(mapRecord);
+ }
+
+ public Iterator<JournalHashMap<K, V, C>> iterMaps() {
+ return journalMaps.values().iterator();
+ }
+
+ public JournalHashMap<K, V, C> getMap(long collectionID, C context) {
+ JournalHashMap<K, V, C> journalHashMap = journalMaps.get(collectionID);
+ if (journalHashMap == null) {
+ journalHashMap = new JournalHashMap<>(collectionID, journal,
idSupplier, persister, recordType, ioCompletionSupplier, contextProvider,
ioExceptionListener).setContext(context);
+ journalMaps.put(collectionID, journalHashMap);
+ }
+ return journalHashMap;
+ }
Review Comment:
Presumably this is only being used from a single thread?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -617,4 +650,55 @@ public static void routeMirrorCommand(ActiveMQServer
server, Message message, Tr
server.getPostOffice().route(message, ctx, false);
}
+ class PagedRouteContext implements RouteContextList {
+
+ private final List<Queue> listRepresentation;
+ private final List<Queue> emptyList = Collections.emptyList();
+
+ PagedRouteContext() {
+ listRepresentation = new ArrayList<>(1);
+ listRepresentation.add(snfQueue);
Review Comment:
If the snfQueue durability is essentially fixed, cant we pre-determine the
answers/return values and simplify much of the method code?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message,
RoutingContext context)
return;
}
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext,
this::copyMessageForPaging)) {
+ return;
+ }
+
+ if (message.isPaged()) {
+ // if the source was paged, we copy the message
+ message = copyMessageForPaging(message);
+ }
Review Comment:
Why?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message,
RoutingContext context)
return;
}
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext,
this::copyMessageForPaging)) {
+ return;
+ }
Review Comment:
What is this doing? What did it before?
Issue Time Tracking
-------------------
Worklog Id: (was: 906431)
Time Spent: 0.5h (was: 20m)
> Performance improvements on Mirror and Paging
> ---------------------------------------------
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.33.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A
> collection with pending IDs is created and a few retries are performed at
> different levels.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)