[ 
https://issues.apache.org/jira/browse/ARTEMIS-4024?focusedWorklogId=814119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814119
 ]

ASF GitHub Bot logged work on ARTEMIS-4024:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/22 20:07
            Start Date: 05/Oct/22 20:07
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4240:
URL: https://github.com/apache/activemq-artemis/pull/4240#discussion_r985950972


##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java:
##########
@@ -558,6 +564,21 @@ public void physicalSend(Command command) throws 
IOException {
 
    }
 
+   private void chunkSend(final ByteSequence bytes, final int bufferSize, 
final int maxPacketSize) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Sending a big packet sized as {} with smaller packets 
of {}", bufferSize, maxPacketSize);
+      }
+      for (int posBuffer = 0; posBuffer < bufferSize; posBuffer += 
maxPacketSize) {

Review Comment:
   It doesnt cause an issue here currently, but posBuffer will be set to the 
wrong value at the end of the last loop in most cases (unless the buffer is a 
perfect multiple of maxPacketSize). A while loop might be a nicer fit.



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java:
##########
@@ -546,9 +546,15 @@ public void physicalSend(Command command) throws 
IOException {
       try {
          final ByteSequence bytes = outWireFormat.marshal(command);
          final int bufferSize = bytes.length;
-         final ActiveMQBuffer buffer = 
transportConnection.createTransportBuffer(bufferSize);
-         buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
-         transportConnection.write(buffer, false, false);
+         final int maxPacketSize = protocolManager.getOpenwireMaxPacketSize();

Review Comment:
   Think the name of this could be better, getOpenwireMaxPacketSize() sounds 
more like an ultimate limit on the packet size, whereas it specifically 
isnt....its the point that packets *bigger than* the 'MaxPacketSize' get 
written in chunks of up to 'MaxPacketSize' instead.
   
   Perhaps OpenwireMaxPacketChunkSize?



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+
+/** This MessageDecoder is based on LengthFieldBasedFrameDecoder.
+ *   When OpenWire clients send a Large Message (large in the context of size 
only as openwire does not support message chunk streaming).
+ *   In that context the server will transfer the huge frame to a Heap Buffer, 
instead of keeping a really large native buffer.
+ *
+ *   There's a test showing this situation under ./soak-tests named 
OWLeakTest. The test will send 200MB messages. For every message sent we would 
have 200MB native buffers
+ *   not leaving much space for the broker to handle its IO as most of the IO 
needs to be done with Native Memory.
+ *  */
+public class OpenWireFrameParser extends ByteToMessageDecoder {
+
+
+   final int maxNativeSize;
+
+   public OpenWireFrameParser(int maxNativeSize) {
+      this.maxNativeSize = maxNativeSize;
+   }
+
+   ByteBuf outBuffer;
+   int bufferSize = -1;
+
+   @Override
+   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception {
+      if (ctx.isRemoved()) {
+         return;
+      }
+
+      if (bufferSize == -1) {
+         if (in.readableBytes() < DataConstants.SIZE_INT) {
+            return;
+         }
+
+         bufferSize = in.getInt(in.readerIndex()) + DataConstants.SIZE_INT;
+
+         if (maxNativeSize > 0 && bufferSize > maxNativeSize) {
+            // we will use a heap buffer for large frames.
+            // to avoid competing for resources with the broker on native 
messages.
+            // to save the broker in case users send huge messages in openwire.
+            outBuffer = 
UnpooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);

Review Comment:
   Might be worth adding a trace log indicating it happened.



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java:
##########
@@ -142,6 +145,18 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
 
    private int openWireDestinationCacheSize = 16;
 
+   /** if defined, LargeMessages will be sent in chunks to the network.
+    * Notice that the system will still load the entire file in memory before 
sending on the stream.
+    * This should avoid just a big packet allocated. */

Review Comment:
   This should avoid just a big _buffer being_ allocated.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java:
##########
@@ -70,8 +75,26 @@ private static Message asLargeMessage(Message message, 
StorageManager storageMan
       LargeServerMessage lsm = 
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
       ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
       final int readableBytes = buffer.readableBytes();
-      lsm.addBytes(buffer);
-      lsm.releaseResources(true, true);
+
+      ByteBuf buf  = PooledByteBufAllocator.DEFAULT.buffer(CHUNK_LM_SIZE);
+      try {
+         ActiveMQBuffer wrappedBuffer = new ChannelBufferWrapper(buf);
+
+         // We write in chunks to avoid allocating a full NativeBody sized as 
the message size
+         // which might lead the broker out of resources
+         while (buffer.readableBytes() > 0) {
+            wrappedBuffer.writerIndex(0);
+            wrappedBuffer.readerIndex(0);
+            int bytesToRead = Math.min(CHUNK_LM_SIZE, buffer.readableBytes());
+            buffer.readBytes(wrappedBuffer, 0, bytesToRead);

Review Comment:
   Couldnt the wrappedBuffer just be playing with the indices/limits so that 
'windows' of the original underlying "buffer" so they can be added directly to 
the LSM incrementally rather than copied out of it and then immediately copied 
back into the LSM?



##########
tests/soak-tests/src/test/scripts/parameters.sh:
##########
@@ -0,0 +1,99 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# this script contains a suggest set of variables to run the 
HorizontalPagingTest in a medium environment and hit some issues we used to 
have with paging

Review Comment:
   Comment is a bit stale



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java:
##########
@@ -130,6 +129,10 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
 
    private boolean openwireUseDuplicateDetectionOnFailover = true;
 
+   // if positive, packets will sent in chunks avoiding a single allocation
+   // this is to prevent large messages allocating really huge packets

Review Comment:
   really huge _buffers_





Issue Time Tracking
-------------------

    Worklog Id:     (was: 814119)
    Time Spent: 1h  (was: 50m)

> Avoid excessive NativeMemory allocation when sending OpenWire Multi mega 
> sized messages in openwire
> ---------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4024
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4024
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Robbie Gemmell
>            Priority: Major
>             Fix For: 2.27.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> when sending a large message in openwire, we will read the entire file on the 
> memory, make the conversion from core, and send it on net 
> throughOpenWireProtocolManager::sendPhisical.
> Such allocation should be limited and be sent in chunks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to