gemmellr commented on a change in pull request #3876: URL: https://github.com/apache/activemq-artemis/pull/3876#discussion_r766530836
########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/BoundActor.java ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.ToIntFunction; + +public class BoundActor<T> extends ProcessorBase<Object> { + + private static final AtomicIntegerFieldUpdater<BoundActor> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "size"); + private volatile int size = 0; + + private static final AtomicIntegerFieldUpdater<BoundActor> SCHEDULED_FUSH_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "scheduledFlush"); Review comment: Typo in name, FUSH ########## File path: artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/BoundActorTest.java ########## @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class BoundActorTest { + + Semaphore semaphore = new Semaphore(1); + AtomicInteger result = new AtomicInteger(0); + AtomicInteger lastProcessed = new AtomicInteger(0); + + @Test + public void limitedSize() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + AtomicInteger timesOpen = new AtomicInteger(0); + AtomicInteger timesClose = new AtomicInteger(0); + AtomicBoolean open = new AtomicBoolean(true); + try { + semaphore.acquire(); + BoundActor<Integer> actor = new BoundActor<>(executorService, this::process, 10, (s) -> 1, () -> { + timesClose.incrementAndGet(); + open.set(false); + }, () -> { + timesOpen.incrementAndGet(); + open.set(true); + }); + + for (int i = 0; i < 10; i++) { + actor.act(i); + } + Assert.assertTrue(open.get()); + Assert.assertEquals(0, timesClose.get()); + actor.act(99); + Assert.assertEquals(1, timesClose.get()); Review comment: Newlines separating blocks of activity and related checks can be quite nice for readability. ########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/BoundActor.java ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.ToIntFunction; + +public class BoundActor<T> extends ProcessorBase<Object> { Review comment: Using Bound in the name feels awkward when it isnt actually bound in the way most might expect the word to suggest. Though I dont necessarily have a great alternative to suggest. NotifyingActor? :) ########## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ########## @@ -396,6 +396,12 @@ private Object loadSSLContext() { } } + + public int getTcpReceiveBufferSize() { + return tcpReceiveBufferSize; + } + Review comment: Superfluous newlines before+after. ########## File path: tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + + Thread consume1 = new Thread(() -> consume(protocol, "queue1"), "ProducerQueue1"); + consume1.start(); + Thread consume2 = new Thread(() -> consume(protocol, "queue2"), "ProducerQueue2"); + consume2.start(); + + Thread produce1 = new Thread(() -> produce(protocol, "queue1"), "ConsumerQueue1"); + produce1.start(); + Thread produce2 = new Thread(() -> produce(protocol, "queue2"), "ConsumerQueue2"); + produce2.start(); Review comment: Names look inverted, consumer using "ProducerQueue1", producer using "ConsumerQueue1" etc. Using an executor might be nicer in terms of readability and cleanup etc. ########## File path: tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + + Thread consume1 = new Thread(() -> consume(protocol, "queue1"), "ProducerQueue1"); + consume1.start(); + Thread consume2 = new Thread(() -> consume(protocol, "queue2"), "ProducerQueue2"); + consume2.start(); + + Thread produce1 = new Thread(() -> produce(protocol, "queue1"), "ConsumerQueue1"); + produce1.start(); + Thread produce2 = new Thread(() -> produce(protocol, "queue2"), "ConsumerQueue2"); + produce2.start(); + + Thread.sleep(10_000); + + running = false; + + consume1.join(); + consume2.join(); + produce1.join(); + produce2.join(); + + ConnectionFactory factory = newCF("openwire"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue3"); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + + String random = RandomUtil.randomString(); + + producer.send(session.createTextMessage(random)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(random, message.getText()); + connection.close(); + + Assert.assertEquals(0, errors.get()); + + } + + protected void consume(String protocol, String queueName) { + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + try { + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + int rec = 0; + while (running) { + consumer.receive(5000); Review comment: If should probably check it actually gets something expected before counting it. This test could even pass without the threaded producers/consumers actually doing anything so long as they dont throw, only the third producer/consumers single message affects the result otherwise. ########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/BoundActor.java ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.ToIntFunction; + +public class BoundActor<T> extends ProcessorBase<Object> { + + private static final AtomicIntegerFieldUpdater<BoundActor> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "size"); + private volatile int size = 0; + + private static final AtomicIntegerFieldUpdater<BoundActor> SCHEDULED_FUSH_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "scheduledFlush"); + private volatile int scheduledFlush = 0; + + private static final Object FLUSH = new Object(); + + + final int maxSize; + final ToIntFunction<T> sizeGetter; + ActorListener<T> listener; + + Runnable overLimit; + Runnable clearLimit; + + public BoundActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overLimit, Runnable clearLimit) { + super(parent); + this.listener = listener; + this.maxSize = maxSize; + this.sizeGetter = sizeGetter; + this.overLimit = overLimit; + this.clearLimit = clearLimit; + } + + @Override + protected final void doTask(Object task) { + if (task == FLUSH) { + this.scheduledFlush = 0; + clearLimit.run(); + return; + } + try { + listener.onMessage((T)task); + } finally { + SIZE_UPDATER.getAndAdd(this, -sizeGetter.applyAsInt((T)task)); + } + } + + public void act(T message) { + int size = SIZE_UPDATER.addAndGet(this, (sizeGetter.applyAsInt((T)message))); Review comment: Are the (...) around the sizeGetter needed? Seems inconsistent with not having them in the method above. ########## File path: tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + + Thread consume1 = new Thread(() -> consume(protocol, "queue1"), "ProducerQueue1"); + consume1.start(); + Thread consume2 = new Thread(() -> consume(protocol, "queue2"), "ProducerQueue2"); + consume2.start(); + + Thread produce1 = new Thread(() -> produce(protocol, "queue1"), "ConsumerQueue1"); + produce1.start(); + Thread produce2 = new Thread(() -> produce(protocol, "queue2"), "ConsumerQueue2"); + produce2.start(); + + Thread.sleep(10_000); Review comment: This perhaps feels like it would be better as a soak test (even taking a good bit longer than this) rather than a smoke test needing to burn >10sec on every run. ########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/BoundActor.java ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.ToIntFunction; + +public class BoundActor<T> extends ProcessorBase<Object> { + + private static final AtomicIntegerFieldUpdater<BoundActor> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "size"); + private volatile int size = 0; + + private static final AtomicIntegerFieldUpdater<BoundActor> SCHEDULED_FUSH_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BoundActor.class, "scheduledFlush"); + private volatile int scheduledFlush = 0; + + private static final Object FLUSH = new Object(); + + + final int maxSize; + final ToIntFunction<T> sizeGetter; + ActorListener<T> listener; + + Runnable overLimit; + Runnable clearLimit; + + public BoundActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overLimit, Runnable clearLimit) { + super(parent); + this.listener = listener; + this.maxSize = maxSize; + this.sizeGetter = sizeGetter; + this.overLimit = overLimit; + this.clearLimit = clearLimit; + } + + @Override + protected final void doTask(Object task) { + if (task == FLUSH) { + this.scheduledFlush = 0; Review comment: Direct use of the variable can be confusing later when there is a FieldUpdater for it too, might be good to use that consistently. ########## File path: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ########## @@ -236,6 +238,18 @@ public void removeConnection(ConnectionInfo info, Throwable error) throws Invali } } + /*** if set, the OpenWire connection will bypass the tcpReadBuferSize and use this value instead. + * This is by default -1, and it should not be used unless in extreme situations like on a slow storage. */ + public int getMaxActorSize() { + return maxActorSize; + } + + public OpenWireProtocolManager setMaxActorSize(int maxActorSize) { + System.out.println("max actor size " + maxActorSize); Review comment: Leftover System.out.println ? / Use a logger. ########## File path: tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + + Thread consume1 = new Thread(() -> consume(protocol, "queue1"), "ProducerQueue1"); + consume1.start(); + Thread consume2 = new Thread(() -> consume(protocol, "queue2"), "ProducerQueue2"); + consume2.start(); + + Thread produce1 = new Thread(() -> produce(protocol, "queue1"), "ConsumerQueue1"); + produce1.start(); + Thread produce2 = new Thread(() -> produce(protocol, "queue2"), "ConsumerQueue2"); + produce2.start(); + + Thread.sleep(10_000); + + running = false; + + consume1.join(); + consume2.join(); + produce1.join(); + produce2.join(); + + ConnectionFactory factory = newCF("openwire"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue3"); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + + String random = RandomUtil.randomString(); + + producer.send(session.createTextMessage(random)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(random, message.getText()); + connection.close(); + + Assert.assertEquals(0, errors.get()); + + } + + protected void consume(String protocol, String queueName) { + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + try { + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + int rec = 0; + while (running) { + consumer.receive(5000); + rec++; + if (rec % 10 == 0) { + System.out.println(queueName + " receive " + rec); + } + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } finally { + try { + connection.close(); + } catch (Exception ignored) { + } + } + } + + protected void produce(String protocol, String queueName) { + + int produced = 0; + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + try { + + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + String randomString; + { + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < 10000) { + buffer.append(RandomUtil.randomString()); + } + randomString = buffer.toString(); + } + + while (running) { + if (++produced % 10 == 0) { Review comment: Same as previous comment ########## File path: tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FloodServerWithAsyncSendTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "paging"; + + volatile boolean running = true; + + AtomicInteger errors = new AtomicInteger(0); + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testAsyncPagingOpenWire() throws Exception { + String protocol = "OPENWIRE"; + internalTest(protocol); + + } + + ConnectionFactory newCF(String protocol) { + if (protocol.equalsIgnoreCase("OPENWIRE")) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.useAsyncSend=true"); + } else { + Assert.fail("unsuported protocol"); + return null; + } + } + + private void internalTest(String protocol) throws Exception { + + Thread consume1 = new Thread(() -> consume(protocol, "queue1"), "ProducerQueue1"); + consume1.start(); + Thread consume2 = new Thread(() -> consume(protocol, "queue2"), "ProducerQueue2"); + consume2.start(); + + Thread produce1 = new Thread(() -> produce(protocol, "queue1"), "ConsumerQueue1"); + produce1.start(); + Thread produce2 = new Thread(() -> produce(protocol, "queue2"), "ConsumerQueue2"); + produce2.start(); + + Thread.sleep(10_000); + + running = false; + + consume1.join(); + consume2.join(); + produce1.join(); + produce2.join(); + + ConnectionFactory factory = newCF("openwire"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue3"); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + + String random = RandomUtil.randomString(); + + producer.send(session.createTextMessage(random)); + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(random, message.getText()); + connection.close(); + + Assert.assertEquals(0, errors.get()); + + } + + protected void consume(String protocol, String queueName) { + ConnectionFactory factory = newCF(protocol); + Connection connection = null; + try { + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + int rec = 0; + while (running) { + consumer.receive(5000); + rec++; + if (rec % 10 == 0) { Review comment: Every 10 seems like it could be quite spammy, given the test length. Something larger, plus a log of the final total as it exits the loop (and perhaps when it throws), might seem nicer. ########## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ########## @@ -713,6 +713,9 @@ public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) { + if (runWhenBlocking != null) { + runWhenBlocking.run(); Review comment: Might be worth addressing https://issues.apache.org/jira/browse/ARTEMIS-3591 here too if this overall change raises the importance of when and how often the callbacks are being run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
