[ 
https://issues.apache.org/jira/browse/ARTEMIS-5573?focusedWorklogId=1015503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-1015503
 ]

ASF GitHub Bot logged work on ARTEMIS-5573:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/26 15:28
            Start Date: 14/Apr/26 15:28
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #6323:
URL: https://github.com/apache/artemis/pull/6323#discussion_r3080401743


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV4.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.core.persistence.PersisterIDs.AMQPMessagePersisterV4_ID;
+
+/**
+ * V4 adds a size field to determine persister boundaries, enabling 
forward-compatible
+ * extensions without additional versioning.
+ **/
+public class AMQPMessagePersisterV4 extends AMQPMessagePersisterV3 {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final byte ID = AMQPMessagePersisterV4_ID;
+
+   public static AMQPMessagePersisterV4 theInstance;
+
+   public static AMQPMessagePersisterV4 getInstance() {
+      if (theInstance == null) {
+         theInstance = new AMQPMessagePersisterV4();
+      }
+      return theInstance;
+   }
+
+   @Override
+   public byte getID() {
+      return ID;
+   }
+
+   public AMQPMessagePersisterV4() {
+      super();
+   }
+
+
+   protected static final int PERSISTER_SIZE = DataConstants.SIZE_INT + // 
memory estimate
+      DataConstants.SIZE_BYTE +
+      DataConstants.SIZE_BOOLEAN; // message priority
+
+   @Override
+   public int getEncodeSize(Message record) {
+      int encodeSize = super.getEncodeSize(record) + PERSISTER_SIZE + 
DataConstants.SIZE_INT; // the size delimiter and whatever is written in encode
+      return encodeSize;
+   }
+
+
+   @Override
+   public void encode(ActiveMQBuffer buffer, Message record) {
+      super.encode(buffer, record);
+
+      writeSizeDelimiter(buffer);
+      buffer.writeInt(record.getMemoryEstimate());
+      buffer.writeByte(record.getPriority());
+      buffer.writeBoolean(record.isDurable());
+   }
+
+   protected void writeSizeDelimiter(ActiveMQBuffer buffer) {
+      // this is to allow us to determine the boundary of this persister, for 
future use.
+      buffer.writeInt(PERSISTER_SIZE);
+   }
+
+   @Override
+   public Message decode(ActiveMQBuffer buffer, Message ignore, 
CoreMessageObjectPools pool) {
+      Message record = super.decode(buffer, ignore, pool);
+
+      int sizePersister = buffer.readInt();
+      int lastPosition = buffer.readerIndex() + sizePersister;
+
+      {
+         AMQPStandardMessage standardMessage = (AMQPStandardMessage) record;
+         standardMessage.setMemoryEstimate(buffer.readInt());
+         standardMessage.reloadPriority(buffer.readByte());
+         standardMessage.reloadSetDurable(buffer.readBoolean());
+
+         assert buffer.readerIndex() <= lastPosition;
+      }
+
+      // if a future version of this persister wrote more bytes than what we 
expected now, this will take care of skipping them
+      buffer.readerIndex(lastPosition);
+
+      // note that beyond V4 we are not calling scanAfterReload

Review Comment:
   'for V4 and beyond' would be clearer, rather than just 'beyond V4'



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -198,18 +207,23 @@ private static void checkCode(int code) {
    protected int messageAnnotationsPosition = VALUE_NOT_PRESENT;
    protected int propertiesPosition = VALUE_NOT_PRESENT;
    protected int applicationPropertiesPosition = VALUE_NOT_PRESENT;
+   protected int applicationPropertiesCount;
    protected int remainingBodyPosition = VALUE_NOT_PRESENT;
 
    // Message level meta data
    protected final long messageFormat;
    protected long messageID;
    protected SimpleString address;
-   protected volatile int memoryEstimate = -1;
-   protected volatile int originalEstimate = -1;
+   protected volatile int memoryEstimate = VALUE_NOT_PRESENT;
    protected long expiration;
    protected boolean expirationReload = false;
-   protected long scheduledTime = -1;
-   protected byte priority = DEFAULT_MESSAGE_PRIORITY;
+   protected long scheduledTime = VALUE_NOT_PRESENT;
+   protected byte priority = VALUE_NOT_PRESENT;
+
+   /**
+    * 0 = false, 1 = true, or VALUE_NOT_PRESENT
+    */
+   protected byte durable = VALUE_NOT_PRESENT;

Review Comment:
   some constants would make this and the several related methods far more 
readable





Issue Time Tracking
-------------------

    Worklog Id:     (was: 1015503)
    Time Spent: 10h 40m  (was: 10.5h)

> Make AMQP Size immutable
> ------------------------
>
>                 Key: ARTEMIS-5573
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5573
>             Project: Artemis
>          Issue Type: Improvement
>          Components: AMQP
>    Affects Versions: 2.41.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> I have had a lot of issues, even recently on races between re-evaluating a 
> message size in AMQP.
> Say a lazy decode happens at the wrong time and the memory estimates can be 
> wrong.
> We have fixed issues along the years, but this is still a fragile process 
> that is bound to fail. If an user for instance add a plugin breaking the 
> chain of events.
> For that reason the memory estimate should already include enough estimation 
> for any properties decoded and the process should be simplified.
> Less moving parts would mean less possibilities for bugs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to