[ 
https://issues.apache.org/jira/browse/ARTEMIS-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Molchanov updated ARTEMIS-2293:
-------------------------------------
    Description: 
Block that handles exceptions in the catch(Exception e) doesn't call 
notifyAll(). That cause that other working threads are not released in the 
waitCompletion method.

[https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
        int flowControlCredit = 0;
        
        synchronized (this) {
        packetAdded = true;
        if (outStream != null) {
        try {
        if (!isContinues) {
        streamEnded = true;
        }
        
        if (fileCache != null) {
        fileCache.cachePackage(chunk);
        }
        
        outStream.write(chunk);
        
        flowControlCredit = flowControlSize;
        
        notifyAll();
        
        if (streamEnded) {
        outStream.close();
        }
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        } else {
        if (fileCache != null) {
        try {
        fileCache.cachePackage(chunk);
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        }
        
        largeMessageData.offer(new LargeData(chunk, flowControlSize, 
isContinues));
        }
        }{code}
 

waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws 
ActiveMQException {
        if (outStream == null) {
        // There is no stream.. it will never achieve the end of streaming
        return false;
        }
        
        long timeOut;
        
        // If timeWait = 0, we will use the readTimeout
        // And we will check if no packets have arrived within readTimeout 
milliseconds
        if (timeWait != 0) {
        timeOut = System.currentTimeMillis() + timeWait;
        } else {
        timeOut = System.currentTimeMillis() + readTimeout;
        }
        
        while (!streamEnded && handledException == null) {
        try {
        this.wait(timeWait == 0 ? readTimeout : timeWait);
        } catch (InterruptedException e) {
        throw new ActiveMQInterruptedException(e);
        }
        
        if (!streamEnded && handledException == null) {
        if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        }
        }
        }
        
        checkException();
        
        return streamEnded;
        
        }{code}
 

 

  was:
Block that handles exceptions in the catch(Exception e) doesn't call 
notifyAll(). That cause that other working threads are not released in the 
waitCompletion method.

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
        int flowControlCredit = 0;
        
        synchronized (this) {
        packetAdded = true;
        if (outStream != null) {
        try {
        if (!isContinues) {
        streamEnded = true;
        }
        
        if (fileCache != null) {
        fileCache.cachePackage(chunk);
        }
        
        outStream.write(chunk);
        
        flowControlCredit = flowControlSize;
        
        notifyAll();
        
        if (streamEnded) {
        outStream.close();
        }
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        } else {
        if (fileCache != null) {
        try {
        fileCache.cachePackage(chunk);
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        }
        
        largeMessageData.offer(new LargeData(chunk, flowControlSize, 
isContinues));
        }
        }{code}
 

waitCompletion method:
{code:java}

public synchronized boolean waitCompletion(final long timeWait) throws 
ActiveMQException {
        if (outStream == null) {
        // There is no stream.. it will never achieve the end of streaming
        return false;
        }
        
        long timeOut;
        
        // If timeWait = 0, we will use the readTimeout
        // And we will check if no packets have arrived within readTimeout 
milliseconds
        if (timeWait != 0) {
        timeOut = System.currentTimeMillis() + timeWait;
        } else {
        timeOut = System.currentTimeMillis() + readTimeout;
        }
        
        while (!streamEnded && handledException == null) {
        try {
        this.wait(timeWait == 0 ? readTimeout : timeWait);
        } catch (InterruptedException e) {
        throw new ActiveMQInterruptedException(e);
        }
        
        if (!streamEnded && handledException == null) {
        if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        }
        }
        }
        
        checkException();
        
        return streamEnded;
        
        }{code}
 

 


> addPacket method in the 
> org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl  
> doesn't notify threads in case of an Exception
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-2293
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2293
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.6.4
>            Reporter: Pavel Molchanov
>            Priority: Major
>
> Block that handles exceptions in the catch(Exception e) doesn't call 
> notifyAll(). That cause that other working threads are not released in the 
> waitCompletion method.
> [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]
>  
> addPacket method:
> {code:java}
> public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) 
> {
>       int flowControlCredit = 0;
>       
>       synchronized (this) {
>       packetAdded = true;
>       if (outStream != null) {
>       try {
>       if (!isContinues) {
>       streamEnded = true;
>       }
>       
>       if (fileCache != null) {
>       fileCache.cachePackage(chunk);
>       }
>       
>       outStream.write(chunk);
>       
>       flowControlCredit = flowControlSize;
>       
>       notifyAll();
>       
>       if (streamEnded) {
>       outStream.close();
>       }
>       } catch (Exception e) {
>       ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
>       handledException = e;
>       }
>       } else {
>       if (fileCache != null) {
>       try {
>       fileCache.cachePackage(chunk);
>       } catch (Exception e) {
>       ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
>       handledException = e;
>       }
>       }
>       
>       largeMessageData.offer(new LargeData(chunk, flowControlSize, 
> isContinues));
>       }
>       }{code}
>  
> waitCompletion method:
> {code:java}
> public synchronized boolean waitCompletion(final long timeWait) throws 
> ActiveMQException {
>       if (outStream == null) {
>       // There is no stream.. it will never achieve the end of streaming
>       return false;
>       }
>       
>       long timeOut;
>       
>       // If timeWait = 0, we will use the readTimeout
>       // And we will check if no packets have arrived within readTimeout 
> milliseconds
>       if (timeWait != 0) {
>       timeOut = System.currentTimeMillis() + timeWait;
>       } else {
>       timeOut = System.currentTimeMillis() + readTimeout;
>       }
>       
>       while (!streamEnded && handledException == null) {
>       try {
>       this.wait(timeWait == 0 ? readTimeout : timeWait);
>       } catch (InterruptedException e) {
>       throw new ActiveMQInterruptedException(e);
>       }
>       
>       if (!streamEnded && handledException == null) {
>       if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
>       throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
>       } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
>       throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
>       }
>       }
>       }
>       
>       checkException();
>       
>       return streamEnded;
>       
>       }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to