[ https://issues.apache.org/jira/browse/ARTEMIS-4771?focusedWorklogId=920697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-920697 ]
ASF GitHub Bot logged work on ARTEMIS-4771: ------------------------------------------- Author: ASF GitHub Bot Created on: 23/May/24 15:26 Start Date: 23/May/24 15:26 Worklog Time Spent: 10m Work Description: gemmellr commented on code in PR #4942: URL: https://github.com/apache/activemq-artemis/pull/4942#discussion_r1611907602 ########## artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriterTest.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton; + +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.activemq.artemis.core.message.LargeBodyReader; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; + +/** + * Tests for the AMQP Large Message Writer + */ +public class AMQPLargeMessageWriterTest { + + @Mock + ProtonServerSenderContext serverSender; + + @Mock + Sender protonSender; + + @Mock + Session protonSession; + + @Mock + Connection protonConnection; + + @Mock + TransportImpl protonTransport; + + @Mock + Delivery protonDelivery; + + @Mock + MessageReference reference; + + @Mock + AMQPLargeMessage message; + + @Mock + LargeBodyReader bodyReader; + + @Mock + AMQPConnectionContext connectionContext; + + @Mock + AMQPSessionContext sessionContext; + + @Mock + AMQPSessionCallback sessionSPI; + + @Mock + org.apache.activemq.artemis.spi.core.remoting.Connection transportConnection; + + @Mock + ActiveMQProtonRemotingConnection remotingConnection; + + @Spy + NullStorageManager nullStoreManager = new NullStorageManager(); + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + + when(serverSender.getSessionContext()).thenReturn(sessionContext); + when(serverSender.getSender()).thenReturn(protonSender); + when(serverSender.createDelivery(any(), anyInt())).thenReturn(protonDelivery); + + when(protonSender.getSession()).thenReturn(protonSession); + when(protonSession.getConnection()).thenReturn(protonConnection); + when(protonConnection.getTransport()).thenReturn(protonTransport); + when(protonTransport.getOutboundFrameSizeLimit()).thenReturn(65535); + + when(transportConnection.getProtocolConnection()).thenReturn(remotingConnection); + when(reference.getMessage()).thenReturn(message); + when(message.isLargeMessage()).thenReturn(true); + when(message.getLargeBodyReader()).thenReturn(bodyReader); + when(sessionContext.getSessionSPI()).thenReturn(sessionSPI); + when(sessionContext.getAMQPConnectionContext()).thenReturn(connectionContext); + when(connectionContext.flowControl(any())).thenReturn(true); + when(sessionSPI.getStorageManager()).thenReturn(nullStoreManager); + when(sessionSPI.getTransportConnection()).thenReturn(transportConnection); + } + + @Test + public void testWriterThrowsIllegalStateIfNotOpenedWhenWriteCalled() throws Exception { + AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender); + + try { + writer.writeBytes(reference); + fail("Should throw as the writer was not opened."); + } catch (IllegalStateException e) { + // Expected + } + } + + @Test + public void testNoWritesWhenProtonSenderIsLocallyClosed() throws Exception { + AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender); + + when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED); + + writer.open(reference); + + try { + writer.writeBytes(reference); + } catch (IllegalStateException e) { + fail("Should not throw as link was closed before write actioned."); + } + + verify(reference).getMessage(); + verify(message).usageUp(); + verify(protonSender).getLocalState(); + + verifyNoMoreInteractions(reference); + verifyNoInteractions(protonDelivery); + } + + @Test + public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Exception { + AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender); + + writer.open(reference); + + when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE); + when(protonDelivery.isPartial()).thenReturn(true); + + // The writer will wait for flow control to resume it + when(connectionContext.flowControl(any())).thenReturn(false); + + verify(message).usageUp(); + + try { + writer.writeBytes(reference); + } catch (IllegalStateException e) { + fail("Should not throw as the delivery is completed so no data should be written."); + } + + verify(message, Mockito.never()).usageDown(); + verify(reference).getMessage(); + verifyNoMoreInteractions(reference); + + try { + writer.close(); + } catch (IllegalStateException e) { + fail("Should not throw as the close when write wasn't completed."); + } + + verify(message).usageDown(); + verify(protonSender).getSession(); + verify(protonDelivery).getTag(); + verify(protonSender, atLeastOnce()).getLocalState(); + + verifyNoMoreInteractions(reference); + verifyNoMoreInteractions(protonDelivery); + } + + @Test + public void testTryDeliveringRunAfterClosedDoesNotThrow() throws Exception { + AMQPLargeMessageWriter writer = new AMQPLargeMessageWriter(serverSender); + + writer.open(reference); + + when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE); + when(protonDelivery.isPartial()).thenReturn(true); + + // The writer will wait for flow control to resume it + when(connectionContext.flowControl(any())).then(invocation -> { + ReadyListener listener = invocation.getArgument(0); + + writer.close(); + + try { + listener.readyForWriting(); + } catch (Exception e) { + fail("Pending deliver should no-op if closed before completion."); + } + + return false; + }); + + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + + try { + runnable.run(); + } catch (Exception e) { + fail("Queued invocation of writer tasks should not throw."); + } + + return null; + }).when(connectionContext).runLater(any(Runnable.class)); Review Comment: Checkstyle failed here, missing 1 space. Issue Time Tracking ------------------- Worklog Id: (was: 920697) Time Spent: 2h 50m (was: 2h 40m) > NPE between AMQPLargeMessageWriter::tryDelivering and resetClose > ---------------------------------------------------------------- > > Key: ARTEMIS-4771 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4771 > Project: ActiveMQ Artemis > Issue Type: Bug > Reporter: Clebert Suconic > Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > This is using RedHat's bits: > java.lang.NullPointerException: Cannot invoke > "org.apache.qpid.proton.engine.Delivery.getTag()" because "this.delivery" is > null > at > org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter.tryDelivering(AMQPLargeMessageWriter.java:174) > ~[artemis-amqp-protocol-2.33.0.redhat-00009.jar:2.33.0.redhat-00009] > at > io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) > ~[netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) > [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) > [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) > [netty-transport-classes-epoll-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > [netty-common-4.1.108.Final-redhat-00001.jar:4.1.108.Final-redhat-00001] > at > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > [artemis-commons-2.33.0.redhat-00009.jar:2.33.0.redhat-00009] -- This message was sent by Atlassian Jira (v8.20.10#820010)