[ 
https://issues.apache.org/jira/browse/ARTEMIS-4024?focusedWorklogId=814134&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814134
 ]

ASF GitHub Bot logged work on ARTEMIS-4024:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/22 20:08
            Start Date: 05/Oct/22 20:08
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4240:
URL: https://github.com/apache/activemq-artemis/pull/4240#discussion_r986016994


##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/owleak/OWLeakTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.owleak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.soak.TestParameters.intMandatoryProperty;
+import static 
org.apache.activemq.artemis.tests.soak.TestParameters.testProperty;
+
+/**
+ * Refer to ./scripts/parameters.sh for suggested parameters
+ *
+ * Even though this test is not testing Paging, it will use Page just to 
generate enough load to the server to compete for resources in Native Buffers.
+ *
+ */
+@RunWith(Parameterized.class)
+public class OWLeakTest extends SoakTestBase {
+
+   private static final int OK = 33; // arbitrary code. if the spawn returns 
this the test went fine
+
+   public static final String SERVER_NAME_0 = "openwire-leaktest";
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private static final String TEST_NAME = "OW_LEAK";
+   private static final boolean TEST_ENABLED = 
Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
+   private static final String PROTOCOL_LIST = testProperty(TEST_NAME, 
"PROTOCOL_LIST", "OPENWIRE");
+   private final String protocol;
+   private final int NUMBER_OF_MESSAGES;
+   private final int PRODUCERS;
+   private final int MESSAGE_SIZE;
+   Process serverProcess;
+
+   public OWLeakTest(String protocol) {
+      this.protocol = protocol;
+      NUMBER_OF_MESSAGES = intMandatoryProperty(TEST_NAME, protocol + 
"_NUMBER_OF_MESSAGES");
+      PRODUCERS = intMandatoryProperty(TEST_NAME, protocol + "_PRODUCERS");
+      MESSAGE_SIZE = intMandatoryProperty(TEST_NAME, protocol + 
"_MESSAGE_SIZE");
+   }
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection<Object[]> parameters() {
+      String[] protocols = PROTOCOL_LIST.split(",");
+
+      ArrayList<Object[]> parameters = new ArrayList<>();
+      for (String str : protocols) {
+         logger.info("Adding {} to the list for the test", str);
+         parameters.add(new Object[]{str});
+      }
+
+      return parameters;
+   }
+
+   @Before
+   public void before() throws Exception {
+      Assume.assumeTrue(TEST_ENABLED);
+      cleanupData(SERVER_NAME_0);
+
+      serverProcess = startServer(SERVER_NAME_0, 0, 10_000);
+   }
+
+
+   public static void main(String[] arg) {
+      int PRODUCERS = Integer.parseInt(arg[0]);
+      int NUMBER_OF_MESSAGES = Integer.parseInt(arg[1]);
+      int MESSAGE_SIZE = Integer.parseInt(arg[2]);
+      String protocol = arg[3];
+      ExecutorService service = Executors.newFixedThreadPool(PRODUCERS + 1 + 
1);
+
+      String QUEUE_NAME = "some_queue";
+
+      Semaphore semaphore = new Semaphore(PRODUCERS + 1);
+
+      CountDownLatch latch = new CountDownLatch(PRODUCERS + 1 + 1);
+
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+
+         for (int i = 0; i < PRODUCERS; i++) {
+            final int t = i;
+            ConnectionFactory factory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+            service.execute(() -> {
+               try {
+                  for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++) {
+                     Connection connection = factory.createConnection();
+                     Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                     MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+                     char[] msgStr = new char[MESSAGE_SIZE];
+                     Arrays.fill(msgStr, 'a');
+                     TextMessage message = session.createTextMessage(new 
String(msgStr));
+                     semaphore.acquire();
+                     producer.send(message);
+                     logger.info("Thread {} Sent message with size {} with the 
total number of {} messages of {}", t, MESSAGE_SIZE, msg, NUMBER_OF_MESSAGES);
+                     producer.close();
+                     session.close();
+                     connection.close();
+                  }
+               } catch (Exception e) {
+                  errors.incrementAndGet();
+                  e.printStackTrace();
+                  logger.warn(e.getMessage(), e);
+               } finally {
+                  latch.countDown();
+               }
+            });
+         }
+
+
+         service.execute(() -> {
+
+            try {
+               ConnectionFactory factory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+               connection.start();
+
+               for (int i = 0; i < NUMBER_OF_MESSAGES * PRODUCERS; i++) {
+
+                  TextMessage message = (TextMessage) 
consumer.receive(100_000);
+                  Assert.assertNotNull(message);
+                  logger.info("Received {} messages , total of {}", i, 
(NUMBER_OF_MESSAGES * PRODUCERS));
+                  semaphore.release();
+               }
+
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               running.set(false);
+               latch.countDown();
+            }
+         });
+
+         service.execute(() -> {
+            ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+               MessageProducer producer = 
session.createProducer(session.createQueue("fastQueue"));
+               MessageConsumer consumer = 
session.createConsumer(session.createQueue("fastQueue"));
+               connection.start();
+               long msg = 0;
+               char[] msgStr = new char[1024];
+               String buffer = new String(msgStr);
+               Arrays.fill(msgStr, 'a');
+               while (running.get()) {
+                  TextMessage message = session.createTextMessage(buffer);
+                  producer.send(message);
+                  if (++msg % 10000L == 0L) {
+                     logger.info("Sent and receive {} fast messages", msg);
+                  }
+
+                  if (msg > 5000L) {
+                     message = (TextMessage) consumer.receive(10000);
+                     Assert.assertNotNull(message);
+                  }
+
+                  if (msg % 100L == 0L) {
+                     session.commit();
+                  }
+               }
+               session.commit();
+               producer.close();
+               consumer.close();
+               session.close();
+               connection.close();
+            } catch (Exception e) {
+               errors.incrementAndGet();
+               e.printStackTrace();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               latch.countDown();
+               running.set(false);
+            }
+         });
+
+
+         Assert.assertTrue(latch.await(10, TimeUnit.DAYS));

Review Comment:
   It was a test thing I forgot to rollback.  Thanks. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 814134)
    Time Spent: 1h 20m  (was: 1h 10m)

> Avoid excessive NativeMemory allocation when sending OpenWire Multi mega 
> sized messages in openwire
> ---------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4024
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4024
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Robbie Gemmell
>            Priority: Major
>             Fix For: 2.27.0
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> when sending a large message in openwire, we will read the entire file on the 
> memory, make the conversion from core, and send it on net 
> throughOpenWireProtocolManager::sendPhisical.
> Such allocation should be limited and be sent in chunks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to