[
https://issues.apache.org/jira/browse/ARTEMIS-5573?focusedWorklogId=1015503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-1015503
]
ASF GitHub Bot logged work on ARTEMIS-5573:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Apr/26 15:28
Start Date: 14/Apr/26 15:28
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #6323:
URL: https://github.com/apache/artemis/pull/6323#discussion_r3080401743
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV4.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV4_ID;
+
+/**
+ * V4 adds a size field to determine persister boundaries, enabling
forward-compatible
+ * extensions without additional versioning.
+ **/
+public class AMQPMessagePersisterV4 extends AMQPMessagePersisterV3 {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final byte ID = AMQPMessagePersisterV4_ID;
+
+ public static AMQPMessagePersisterV4 theInstance;
+
+ public static AMQPMessagePersisterV4 getInstance() {
+ if (theInstance == null) {
+ theInstance = new AMQPMessagePersisterV4();
+ }
+ return theInstance;
+ }
+
+ @Override
+ public byte getID() {
+ return ID;
+ }
+
+ public AMQPMessagePersisterV4() {
+ super();
+ }
+
+
+ protected static final int PERSISTER_SIZE = DataConstants.SIZE_INT + //
memory estimate
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN; // message priority
+
+ @Override
+ public int getEncodeSize(Message record) {
+ int encodeSize = super.getEncodeSize(record) + PERSISTER_SIZE +
DataConstants.SIZE_INT; // the size delimiter and whatever is written in encode
+ return encodeSize;
+ }
+
+
+ @Override
+ public void encode(ActiveMQBuffer buffer, Message record) {
+ super.encode(buffer, record);
+
+ writeSizeDelimiter(buffer);
+ buffer.writeInt(record.getMemoryEstimate());
+ buffer.writeByte(record.getPriority());
+ buffer.writeBoolean(record.isDurable());
+ }
+
+ protected void writeSizeDelimiter(ActiveMQBuffer buffer) {
+ // this is to allow us to determine the boundary of this persister, for
future use.
+ buffer.writeInt(PERSISTER_SIZE);
+ }
+
+ @Override
+ public Message decode(ActiveMQBuffer buffer, Message ignore,
CoreMessageObjectPools pool) {
+ Message record = super.decode(buffer, ignore, pool);
+
+ int sizePersister = buffer.readInt();
+ int lastPosition = buffer.readerIndex() + sizePersister;
+
+ {
+ AMQPStandardMessage standardMessage = (AMQPStandardMessage) record;
+ standardMessage.setMemoryEstimate(buffer.readInt());
+ standardMessage.reloadPriority(buffer.readByte());
+ standardMessage.reloadSetDurable(buffer.readBoolean());
+
+ assert buffer.readerIndex() <= lastPosition;
+ }
+
+ // if a future version of this persister wrote more bytes than what we
expected now, this will take care of skipping them
+ buffer.readerIndex(lastPosition);
+
+ // note that beyond V4 we are not calling scanAfterReload
Review Comment:
'for V4 and beyond' would be clearer, rather than just 'beyond V4'
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -198,18 +207,23 @@ private static void checkCode(int code) {
protected int messageAnnotationsPosition = VALUE_NOT_PRESENT;
protected int propertiesPosition = VALUE_NOT_PRESENT;
protected int applicationPropertiesPosition = VALUE_NOT_PRESENT;
+ protected int applicationPropertiesCount;
protected int remainingBodyPosition = VALUE_NOT_PRESENT;
// Message level meta data
protected final long messageFormat;
protected long messageID;
protected SimpleString address;
- protected volatile int memoryEstimate = -1;
- protected volatile int originalEstimate = -1;
+ protected volatile int memoryEstimate = VALUE_NOT_PRESENT;
protected long expiration;
protected boolean expirationReload = false;
- protected long scheduledTime = -1;
- protected byte priority = DEFAULT_MESSAGE_PRIORITY;
+ protected long scheduledTime = VALUE_NOT_PRESENT;
+ protected byte priority = VALUE_NOT_PRESENT;
+
+ /**
+ * 0 = false, 1 = true, or VALUE_NOT_PRESENT
+ */
+ protected byte durable = VALUE_NOT_PRESENT;
Review Comment:
some constants would make this and the several related methods far more
readable
Issue Time Tracking
-------------------
Worklog Id: (was: 1015503)
Time Spent: 10h 40m (was: 10.5h)
> Make AMQP Size immutable
> ------------------------
>
> Key: ARTEMIS-5573
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5573
> Project: Artemis
> Issue Type: Improvement
> Components: AMQP
> Affects Versions: 2.41.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10h 40m
> Remaining Estimate: 0h
>
> I have had a lot of issues, even recently on races between re-evaluating a
> message size in AMQP.
> Say a lazy decode happens at the wrong time and the memory estimates can be
> wrong.
> We have fixed issues along the years, but this is still a fragile process
> that is bound to fail. If an user for instance add a plugin breaking the
> chain of events.
> For that reason the memory estimate should already include enough estimation
> for any properties decoded and the process should be simplified.
> Less moving parts would mean less possibilities for bugs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]