[
https://issues.apache.org/jira/browse/ARTEMIS-4024?focusedWorklogId=814134&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814134
]
ASF GitHub Bot logged work on ARTEMIS-4024:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Oct/22 20:08
Start Date: 05/Oct/22 20:08
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on code in PR #4240:
URL: https://github.com/apache/activemq-artemis/pull/4240#discussion_r986016994
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.owleak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.activemq.artemis.tests.soak.TestParameters.intMandatoryProperty;
+import static
org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
+
+/**
+ * Refer to ./scripts/parameters.sh for suggested parameters
+ *
+ * Even though this test is not testing Paging, it will use Page just to
generate enough load to the server to compete for resources in Native Buffers.
+ *
+ */
+@RunWith(Parameterized.class)
+public class OWLeakTest extends SoakTestBase {
+
+ private static final int OK = 33; // arbitrary code. if the spawn returns
this the test went fine
+
+ public static final String SERVER_NAME_0 = "openwire-leaktest";
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String TEST_NAME = "OW_LEAK";
+ private static final boolean TEST_ENABLED =
Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
+ private static final String PROTOCOL_LIST = testProperty(TEST_NAME,
"PROTOCOL_LIST", "OPENWIRE");
+ private final String protocol;
+ private final int NUMBER_OF_MESSAGES;
+ private final int PRODUCERS;
+ private final int MESSAGE_SIZE;
+ Process serverProcess;
+
+ public OWLeakTest(String protocol) {
+ this.protocol = protocol;
+ NUMBER_OF_MESSAGES = intMandatoryProperty(TEST_NAME, protocol +
"_NUMBER_OF_MESSAGES");
+ PRODUCERS = intMandatoryProperty(TEST_NAME, protocol + "_PRODUCERS");
+ MESSAGE_SIZE = intMandatoryProperty(TEST_NAME, protocol +
"_MESSAGE_SIZE");
+ }
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection<Object[]> parameters() {
+ String[] protocols = PROTOCOL_LIST.split(",");
+
+ ArrayList<Object[]> parameters = new ArrayList<>();
+ for (String str : protocols) {
+ logger.info("Adding {} to the list for the test", str);
+ parameters.add(new Object[]{str});
+ }
+
+ return parameters;
+ }
+
+ @Before
+ public void before() throws Exception {
+ Assume.assumeTrue(TEST_ENABLED);
+ cleanupData(SERVER_NAME_0);
+
+ serverProcess = startServer(SERVER_NAME_0, 0, 10_000);
+ }
+
+
+ public static void main(String[] arg) {
+ int PRODUCERS = Integer.parseInt(arg[0]);
+ int NUMBER_OF_MESSAGES = Integer.parseInt(arg[1]);
+ int MESSAGE_SIZE = Integer.parseInt(arg[2]);
+ String protocol = arg[3];
+ ExecutorService service = Executors.newFixedThreadPool(PRODUCERS + 1 +
1);
+
+ String QUEUE_NAME = "some_queue";
+
+ Semaphore semaphore = new Semaphore(PRODUCERS + 1);
+
+ CountDownLatch latch = new CountDownLatch(PRODUCERS + 1 + 1);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+
+ for (int i = 0; i < PRODUCERS; i++) {
+ final int t = i;
+ ConnectionFactory factory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ service.execute(() -> {
+ try {
+ for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++) {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE_NAME));
+ char[] msgStr = new char[MESSAGE_SIZE];
+ Arrays.fill(msgStr, 'a');
+ TextMessage message = session.createTextMessage(new
String(msgStr));
+ semaphore.acquire();
+ producer.send(message);
+ logger.info("Thread {} Sent message with size {} with the
total number of {} messages of {}", t, MESSAGE_SIZE, msg, NUMBER_OF_MESSAGES);
+ producer.close();
+ session.close();
+ connection.close();
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ e.printStackTrace();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+
+ service.execute(() -> {
+
+ try {
+ ConnectionFactory factory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE_NAME));
+ connection.start();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES * PRODUCERS; i++) {
+
+ TextMessage message = (TextMessage)
consumer.receive(100_000);
+ Assert.assertNotNull(message);
+ logger.info("Received {} messages , total of {}", i,
(NUMBER_OF_MESSAGES * PRODUCERS));
+ semaphore.release();
+ }
+
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ running.set(false);
+ latch.countDown();
+ }
+ });
+
+ service.execute(() -> {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue("fastQueue"));
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("fastQueue"));
+ connection.start();
+ long msg = 0;
+ char[] msgStr = new char[1024];
+ String buffer = new String(msgStr);
+ Arrays.fill(msgStr, 'a');
+ while (running.get()) {
+ TextMessage message = session.createTextMessage(buffer);
+ producer.send(message);
+ if (++msg % 10000L == 0L) {
+ logger.info("Sent and receive {} fast messages", msg);
+ }
+
+ if (msg > 5000L) {
+ message = (TextMessage) consumer.receive(10000);
+ Assert.assertNotNull(message);
+ }
+
+ if (msg % 100L == 0L) {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ e.printStackTrace();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ latch.countDown();
+ running.set(false);
+ }
+ });
+
+
+ Assert.assertTrue(latch.await(10, TimeUnit.DAYS));
Review Comment:
It was a test thing I forgot to rollback. Thanks.
Issue Time Tracking
-------------------
Worklog Id: (was: 814134)
Time Spent: 1h 20m (was: 1h 10m)
> Avoid excessive NativeMemory allocation when sending OpenWire Multi mega
> sized messages in openwire
> ---------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-4024
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4024
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Robbie Gemmell
> Priority: Major
> Fix For: 2.27.0
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> when sending a large message in openwire, we will read the entire file on the
> memory, make the conversion from core, and send it on net
> throughOpenWireProtocolManager::sendPhisical.
> Such allocation should be limited and be sent in chunks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)