Author: cwiklik Date: Mon Aug 9 15:41:12 2010 New Revision: 983684 URL: http://svn.apache.org/viewvc?rev=983684&view=rev Log: UIMA-1855 Fixes inconsistent synchronization reported by Findbugs
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=983684&r1=983683&r2=983684&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Aug 9 15:41:12 2010 @@ -161,151 +161,154 @@ public class JmsEndpointConnection_impl openChannel(getServerUri(), componentName, endpoint, controller); } - private synchronized void openChannel(String brokerUri, String aComponentName, + private void openChannel(String brokerUri, String aComponentName, String anEndpointName, AnalysisEngineController aController) throws AsynchAEException, ServiceShutdownException { - try { - - // If replying to http request, reply to a queue managed by this service broker using tcp - // protocol - if (isReplyEndpoint && brokerUri.startsWith("http")) { - brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI(); - - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.FINE, - CLASS_NAME.getName(), - "open", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_override_connection_to_endpoint__FINE", - new Object[] { aComponentName, getEndpoint(), - ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() }); - } - } + synchronized (recoveryMux) { + try { - if (!isOpen()) { - Connection conn = null; - // Check connection status and create a new one (if necessary) as an atomic operation - try { - connectionSemaphore.acquire(); - if (connectionClosedOrFailed(brokerDestinations)) { - // Create one shared connection per unique brokerURL. - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_activemq_open__FINE", - new Object[] { aController.getComponentName(), anEndpointName, brokerUri }); - } - if ( brokerDestinations.getConnection() != null ) { - try { - // Close the connection to avoid leaks in the broker - brokerDestinations.getConnection().close(); - } catch( Exception e) { - // Ignore exceptions on a close of a bad connection - } - } - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); - // Create shared jms connection to a broker - conn = factory.createConnection(); - factory.setDispatchAsync(true); - factory.setUseAsyncSend(true); - factory.setCopyMessageOnSend(false); - // Cache the connection. There should only be one connection in the jvm - // per unique broker url. - brokerDestinations.setConnection(conn); - // Close and invalidate all sessions previously created from the old connection - Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap - .entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next(); - if (entry.getValue().producerSession != null) { - // Close session - entry.getValue().producerSession.close(); - // Since we created a new connection invalidate session that - // have been created with the old connection - entry.getValue().producerSession = null; - } - } - } - } catch( Exception exc) { - throw exc; // rethrow - } finally { - connectionSemaphore.release(); - } - - connectionCreationTimestamp = System.nanoTime(); - failed = false; - } - Connection conn = brokerDestinations.getConnection(); - if (failed) { - // Unable to create a connection - return; - } - - producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - - if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint) - && delegateEndpoint.getDestination() != null) { - producer = producerSession.createProducer(null); - if (aController != null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_temp_conn_starting__FINE", - new Object[] { aComponentName, anEndpointName, brokerUri }); - } - } - } else { - destination = producerSession.createQueue(getEndpoint()); - producer = producerSession.createProducer(destination); - if (controller != null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_conn_starting__FINE", - new Object[] { aComponentName, anEndpointName, brokerUri }); - } - } - } - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - // Since the connection is shared, start it only once - if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) { - brokerDestinations.getConnection().start(); - } - if (controller != null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri }); - if (controller.getInputChannel() != null) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_connection_open_to_endpoint__FINE", - new Object[] { aComponentName, getEndpoint(), brokerUri }); - } - } - } - failed = false; - } catch (Exception e) { - boolean rethrow = true; - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_service_exception_WARNING", controller.getComponentName()); - - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_exception__WARNING", e); - } - - if (e instanceof JMSException) { - rethrow = handleJmsException((JMSException) e); - - } - if (rethrow) { - throw new AsynchAEException(e); - } - } + // If replying to http request, reply to a queue managed by this service broker using tcp + // protocol + if (isReplyEndpoint && brokerUri.startsWith("http")) { + brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI(); + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.FINE, + CLASS_NAME.getName(), + "open", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_override_connection_to_endpoint__FINE", + new Object[] { aComponentName, getEndpoint(), + ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() }); + } + } + + if (!isOpen()) { + Connection conn = null; + // Check connection status and create a new one (if necessary) as an atomic operation + try { + connectionSemaphore.acquire(); + if (connectionClosedOrFailed(brokerDestinations)) { + // Create one shared connection per unique brokerURL. + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_activemq_open__FINE", + new Object[] { aController.getComponentName(), anEndpointName, brokerUri }); + } + if ( brokerDestinations.getConnection() != null ) { + try { + // Close the connection to avoid leaks in the broker + brokerDestinations.getConnection().close(); + } catch( Exception e) { + // Ignore exceptions on a close of a bad connection + } + } + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); + // Create shared jms connection to a broker + conn = factory.createConnection(); + factory.setDispatchAsync(true); + factory.setUseAsyncSend(true); + factory.setCopyMessageOnSend(false); + // Cache the connection. There should only be one connection in the jvm + // per unique broker url. + brokerDestinations.setConnection(conn); + // Close and invalidate all sessions previously created from the old connection + Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap + .entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next(); + if (entry.getValue().producerSession != null) { + // Close session + entry.getValue().producerSession.close(); + // Since we created a new connection invalidate session that + // have been created with the old connection + entry.getValue().producerSession = null; + } + } + } + } catch( Exception exc) { + throw exc; // rethrow + } finally { + connectionSemaphore.release(); + } + + connectionCreationTimestamp = System.nanoTime(); + failed = false; + } + Connection conn = brokerDestinations.getConnection(); + if (failed) { + // Unable to create a connection + return; + } + + producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint) + && delegateEndpoint.getDestination() != null) { + producer = producerSession.createProducer(null); + if (aController != null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_temp_conn_starting__FINE", + new Object[] { aComponentName, anEndpointName, brokerUri }); + } + } + } else { + destination = producerSession.createQueue(getEndpoint()); + producer = producerSession.createProducer(destination); + if (controller != null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_conn_starting__FINE", + new Object[] { aComponentName, anEndpointName, brokerUri }); + } + } + } + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // Since the connection is shared, start it only once + if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) { + brokerDestinations.getConnection().start(); + } + if (controller != null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri }); + if (controller.getInputChannel() != null) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_connection_open_to_endpoint__FINE", + new Object[] { aComponentName, getEndpoint(), brokerUri }); + } + } + } + failed = false; + } catch (Exception e) { + boolean rethrow = true; + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_service_exception_WARNING", controller.getComponentName()); + + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", e); + } + + if (e instanceof JMSException) { + rethrow = handleJmsException((JMSException) e); + + } + if (rethrow) { + throw new AsynchAEException(e); + } + } + + } } public synchronized void open() throws AsynchAEException, ServiceShutdownException { @@ -334,26 +337,28 @@ public class JmsEndpointConnection_impl } } - public synchronized void close() throws Exception { - if (producer != null) { - try { - producer.close(); - } catch (Exception e) { - // Ignore we are shutting down - } - } - if (producerSession != null) { - try { - producerSession.close(); - } catch (Exception e) { - // Ignore we are shutting down - } - producerSession = null; - } - if (destination != null) { - destination = null; - } - } + public void close() throws Exception { + synchronized (recoveryMux) { + if (producer != null) { + try { + producer.close(); + } catch (Exception e) { + // Ignore we are shutting down + } + } + if (producerSession != null) { + try { + producerSession.close(); + } catch (Exception e) { + // Ignore we are shutting down + } + producerSession = null; + } + if (destination != null) { + destination = null; + } + } + } protected String getEndpoint() { return endpoint;