[
https://issues.apache.org/jira/browse/ARTEMIS-4024?focusedWorklogId=814119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814119
]
ASF GitHub Bot logged work on ARTEMIS-4024:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Oct/22 20:07
Start Date: 05/Oct/22 20:07
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4240:
URL: https://github.com/apache/activemq-artemis/pull/4240#discussion_r985950972
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java:
##########
@@ -558,6 +564,21 @@ public void physicalSend(Command command) throws
IOException {
}
+ private void chunkSend(final ByteSequence bytes, final int bufferSize,
final int maxPacketSize) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending a big packet sized as {} with smaller packets
of {}", bufferSize, maxPacketSize);
+ }
+ for (int posBuffer = 0; posBuffer < bufferSize; posBuffer +=
maxPacketSize) {
Review Comment:
It doesnt cause an issue here currently, but posBuffer will be set to the
wrong value at the end of the last loop in most cases (unless the buffer is a
perfect multiple of maxPacketSize). A while loop might be a nicer fit.
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java:
##########
@@ -546,9 +546,15 @@ public void physicalSend(Command command) throws
IOException {
try {
final ByteSequence bytes = outWireFormat.marshal(command);
final int bufferSize = bytes.length;
- final ActiveMQBuffer buffer =
transportConnection.createTransportBuffer(bufferSize);
- buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
- transportConnection.write(buffer, false, false);
+ final int maxPacketSize = protocolManager.getOpenwireMaxPacketSize();
Review Comment:
Think the name of this could be better, getOpenwireMaxPacketSize() sounds
more like an ultimate limit on the packet size, whereas it specifically
isnt....its the point that packets *bigger than* the 'MaxPacketSize' get
written in chunks of up to 'MaxPacketSize' instead.
Perhaps OpenwireMaxPacketChunkSize?
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireFrameParser.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+
+/** This MessageDecoder is based on LengthFieldBasedFrameDecoder.
+ * When OpenWire clients send a Large Message (large in the context of size
only as openwire does not support message chunk streaming).
+ * In that context the server will transfer the huge frame to a Heap Buffer,
instead of keeping a really large native buffer.
+ *
+ * There's a test showing this situation under ./soak-tests named
OWLeakTest. The test will send 200MB messages. For every message sent we would
have 200MB native buffers
+ * not leaving much space for the broker to handle its IO as most of the IO
needs to be done with Native Memory.
+ * */
+public class OpenWireFrameParser extends ByteToMessageDecoder {
+
+
+ final int maxNativeSize;
+
+ public OpenWireFrameParser(int maxNativeSize) {
+ this.maxNativeSize = maxNativeSize;
+ }
+
+ ByteBuf outBuffer;
+ int bufferSize = -1;
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
+ if (ctx.isRemoved()) {
+ return;
+ }
+
+ if (bufferSize == -1) {
+ if (in.readableBytes() < DataConstants.SIZE_INT) {
+ return;
+ }
+
+ bufferSize = in.getInt(in.readerIndex()) + DataConstants.SIZE_INT;
+
+ if (maxNativeSize > 0 && bufferSize > maxNativeSize) {
+ // we will use a heap buffer for large frames.
+ // to avoid competing for resources with the broker on native
messages.
+ // to save the broker in case users send huge messages in openwire.
+ outBuffer =
UnpooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);
Review Comment:
Might be worth adding a trace log indicating it happened.
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java:
##########
@@ -142,6 +145,18 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
private int openWireDestinationCacheSize = 16;
+ /** if defined, LargeMessages will be sent in chunks to the network.
+ * Notice that the system will still load the entire file in memory before
sending on the stream.
+ * This should avoid just a big packet allocated. */
Review Comment:
This should avoid just a big _buffer being_ allocated.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java:
##########
@@ -70,8 +75,26 @@ private static Message asLargeMessage(Message message,
StorageManager storageMan
LargeServerMessage lsm =
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
- lsm.addBytes(buffer);
- lsm.releaseResources(true, true);
+
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(CHUNK_LM_SIZE);
+ try {
+ ActiveMQBuffer wrappedBuffer = new ChannelBufferWrapper(buf);
+
+ // We write in chunks to avoid allocating a full NativeBody sized as
the message size
+ // which might lead the broker out of resources
+ while (buffer.readableBytes() > 0) {
+ wrappedBuffer.writerIndex(0);
+ wrappedBuffer.readerIndex(0);
+ int bytesToRead = Math.min(CHUNK_LM_SIZE, buffer.readableBytes());
+ buffer.readBytes(wrappedBuffer, 0, bytesToRead);
Review Comment:
Couldnt the wrappedBuffer just be playing with the indices/limits so that
'windows' of the original underlying "buffer" so they can be added directly to
the LSM incrementally rather than copied out of it and then immediately copied
back into the LSM?
##########
tests/soak-tests/src/test/scripts/parameters.sh:
##########
@@ -0,0 +1,99 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# this script contains a suggest set of variables to run the
HorizontalPagingTest in a medium environment and hit some issues we used to
have with paging
Review Comment:
Comment is a bit stale
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java:
##########
@@ -130,6 +129,10 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
private boolean openwireUseDuplicateDetectionOnFailover = true;
+ // if positive, packets will sent in chunks avoiding a single allocation
+ // this is to prevent large messages allocating really huge packets
Review Comment:
really huge _buffers_
Issue Time Tracking
-------------------
Worklog Id: (was: 814119)
Time Spent: 1h (was: 50m)
> Avoid excessive NativeMemory allocation when sending OpenWire Multi mega
> sized messages in openwire
> ---------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-4024
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4024
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Robbie Gemmell
> Priority: Major
> Fix For: 2.27.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> when sending a large message in openwire, we will read the entire file on the
> memory, make the conversion from core, and send it on net
> throughOpenWireProtocolManager::sendPhisical.
> Such allocation should be limited and be sent in chunks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)