Author: cwiklik
Date: Mon Aug  9 15:41:12 2010
New Revision: 983684

URL: http://svn.apache.org/viewvc?rev=983684&view=rev
Log:
UIMA-1855 Fixes inconsistent synchronization reported by Findbugs

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=983684&r1=983683&r2=983684&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 Mon Aug  9 15:41:12 2010
@@ -161,151 +161,154 @@ public class JmsEndpointConnection_impl 
     openChannel(getServerUri(), componentName, endpoint, controller);
   }
 
-  private synchronized void openChannel(String brokerUri, String 
aComponentName,
+  private void openChannel(String brokerUri, String aComponentName,
           String anEndpointName, AnalysisEngineController aController) throws 
AsynchAEException,
           ServiceShutdownException {
-    try {
-
-      // If replying to http request, reply to a queue managed by this service 
broker using tcp
-      // protocol
-      if (isReplyEndpoint && brokerUri.startsWith("http")) {
-        brokerUri = ((JmsOutputChannel) 
aController.getOutputChannel()).getServerURI();
-
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINE,
-                  CLASS_NAME.getName(),
-                  "open",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_override_connection_to_endpoint__FINE",
-                  new Object[] { aComponentName, getEndpoint(),
-                    ((JmsOutputChannel) 
aController.getOutputChannel()).getServerURI() });
-        }
-      }
+         synchronized (recoveryMux) {
+                   try {
 
-      if (!isOpen()) {
-        Connection conn = null;
-        //  Check connection status and create a new one (if necessary) as an 
atomic operation
-        try {
-          connectionSemaphore.acquire();
-          if (connectionClosedOrFailed(brokerDestinations)) {
-            // Create one shared connection per unique brokerURL.
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                      "UIMAJMS_activemq_open__FINE",
-                      new Object[] { aController.getComponentName(), 
anEndpointName, brokerUri });
-            }
-            if ( brokerDestinations.getConnection() != null ) {
-              try {
-                //  Close the connection to avoid leaks in the broker
-                brokerDestinations.getConnection().close();
-              } catch( Exception e) {
-                //  Ignore exceptions on a close of a bad connection
-              }
-            }
-            ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerUri);
-            //  Create shared jms connection to a broker
-            conn = factory.createConnection();
-            factory.setDispatchAsync(true);
-            factory.setUseAsyncSend(true);
-            factory.setCopyMessageOnSend(false);
-            //  Cache the connection. There should only be one connection in 
the jvm
-            //  per unique broker url. 
-            brokerDestinations.setConnection(conn);
-            // Close and invalidate all sessions previously created from the 
old connection
-            Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = 
brokerDestinations.endpointMap
-                    .entrySet().iterator();
-            while (it.hasNext()) {
-              Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
-              if (entry.getValue().producerSession != null) {
-                // Close session
-                entry.getValue().producerSession.close();
-                // Since we created a new connection invalidate session that
-                // have been created with the old connection
-                entry.getValue().producerSession = null;
-              }
-            }
-          }
-        } catch( Exception exc) {
-          throw exc; // rethrow
-        } finally {
-          connectionSemaphore.release();
-        }
-        
-        connectionCreationTimestamp = System.nanoTime();
-        failed = false;
-      }
-      Connection conn = brokerDestinations.getConnection();
-      if (failed) {
-        // Unable to create a connection
-        return;
-      }
-
-      producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
-      if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || 
isReplyEndpoint)
-              && delegateEndpoint.getDestination() != null) {
-        producer = producerSession.createProducer(null);
-        if (aController != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_temp_conn_starting__FINE",
-                    new Object[] { aComponentName, anEndpointName, brokerUri 
});
-          }
-        }
-      } else {
-        destination = producerSession.createQueue(getEndpoint());
-        producer = producerSession.createProducer(destination);
-        if (controller != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_conn_starting__FINE",
-                    new Object[] { aComponentName, anEndpointName, brokerUri 
});
-          }
-        }
-      }
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      // Since the connection is shared, start it only once
-      if (!((ActiveMQConnection) 
brokerDestinations.getConnection()).isStarted()) {
-        brokerDestinations.getConnection().start();
-      }
-      if (controller != null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_conn_started__FINE", new Object[] { endpoint, 
brokerUri });
-          if (controller.getInputChannel() != null) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_connection_open_to_endpoint__FINE",
-                    new Object[] { aComponentName, getEndpoint(), brokerUri });
-          }
-        }
-      }
-      failed = false;
-    } catch (Exception e) {
-      boolean rethrow = true;
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", 
controller.getComponentName());
-        
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-
-      if (e instanceof JMSException) {
-        rethrow = handleJmsException((JMSException) e);
-
-      }
-      if (rethrow) {
-        throw new AsynchAEException(e);
-      }
-    }
+                       // If replying to http request, reply to a queue 
managed by this service broker using tcp
+                       // protocol
+                       if (isReplyEndpoint && brokerUri.startsWith("http")) {
+                         brokerUri = ((JmsOutputChannel) 
aController.getOutputChannel()).getServerURI();
+
+                         if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                           UIMAFramework.getLogger(CLASS_NAME).logrb(
+                                   Level.FINE,
+                                   CLASS_NAME.getName(),
+                                   "open",
+                                   JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                   
"UIMAJMS_override_connection_to_endpoint__FINE",
+                                   new Object[] { aComponentName, 
getEndpoint(),
+                                     ((JmsOutputChannel) 
aController.getOutputChannel()).getServerURI() });
+                         }
+                       }
+
+                       if (!isOpen()) {
+                         Connection conn = null;
+                         //  Check connection status and create a new one (if 
necessary) as an atomic operation
+                         try {
+                           connectionSemaphore.acquire();
+                           if (connectionClosedOrFailed(brokerDestinations)) {
+                             // Create one shared connection per unique 
brokerURL.
+                             if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                                       "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                       "UIMAJMS_activemq_open__FINE",
+                                       new Object[] { 
aController.getComponentName(), anEndpointName, brokerUri });
+                             }
+                             if ( brokerDestinations.getConnection() != null ) 
{
+                               try {
+                                 //  Close the connection to avoid leaks in 
the broker
+                                 brokerDestinations.getConnection().close();
+                               } catch( Exception e) {
+                                 //  Ignore exceptions on a close of a bad 
connection
+                               }
+                             }
+                             ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerUri);
+                             //  Create shared jms connection to a broker
+                             conn = factory.createConnection();
+                             factory.setDispatchAsync(true);
+                             factory.setUseAsyncSend(true);
+                             factory.setCopyMessageOnSend(false);
+                             //  Cache the connection. There should only be 
one connection in the jvm
+                             //  per unique broker url. 
+                             brokerDestinations.setConnection(conn);
+                             // Close and invalidate all sessions previously 
created from the old connection
+                             Iterator<Map.Entry<Object, 
JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
+                                     .entrySet().iterator();
+                             while (it.hasNext()) {
+                               Map.Entry<Object, JmsEndpointConnection_impl> 
entry = it.next();
+                               if (entry.getValue().producerSession != null) {
+                                 // Close session
+                                 entry.getValue().producerSession.close();
+                                 // Since we created a new connection 
invalidate session that
+                                 // have been created with the old connection
+                                 entry.getValue().producerSession = null;
+                               }
+                             }
+                           }
+                         } catch( Exception exc) {
+                           throw exc; // rethrow
+                         } finally {
+                           connectionSemaphore.release();
+                         }
+                         
+                         connectionCreationTimestamp = System.nanoTime();
+                         failed = false;
+                       }
+                       Connection conn = brokerDestinations.getConnection();
+                       if (failed) {
+                         // Unable to create a connection
+                         return;
+                       }
+
+                       producerSession = conn.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
+
+                       if ((delegateEndpoint.getCommand() == 
AsynchAEMessage.Stop || isReplyEndpoint)
+                               && delegateEndpoint.getDestination() != null) {
+                         producer = producerSession.createProducer(null);
+                         if (aController != null) {
+                           if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                             
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                                     "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                     "UIMAJMS_temp_conn_starting__FINE",
+                                     new Object[] { aComponentName, 
anEndpointName, brokerUri });
+                           }
+                         }
+                       } else {
+                         destination = 
producerSession.createQueue(getEndpoint());
+                         producer = 
producerSession.createProducer(destination);
+                         if (controller != null) {
+                           if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                             
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                                     "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                     "UIMAJMS_conn_starting__FINE",
+                                     new Object[] { aComponentName, 
anEndpointName, brokerUri });
+                           }
+                         }
+                       }
+                       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                       // Since the connection is shared, start it only once
+                       if (!((ActiveMQConnection) 
brokerDestinations.getConnection()).isStarted()) {
+                         brokerDestinations.getConnection().start();
+                       }
+                       if (controller != null) {
+                         if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                           
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                                   "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                   "UIMAJMS_conn_started__FINE", new Object[] 
{ endpoint, brokerUri });
+                           if (controller.getInputChannel() != null) {
+                             
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                                     "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                     
"UIMAJMS_connection_open_to_endpoint__FINE",
+                                     new Object[] { aComponentName, 
getEndpoint(), brokerUri });
+                           }
+                         }
+                       }
+                       failed = false;
+                     } catch (Exception e) {
+                       boolean rethrow = true;
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                         
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                                 "openChannel", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                 "UIMAEE_service_exception_WARNING", 
controller.getComponentName());
+                         
+                         
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                                 "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                 "UIMAJMS_exception__WARNING", e);
+                       }
+
+                       if (e instanceof JMSException) {
+                         rethrow = handleJmsException((JMSException) e);
+
+                       }
+                       if (rethrow) {
+                         throw new AsynchAEException(e);
+                       }
+                     }
+                 
+         }
   }
 
   public synchronized void open() throws AsynchAEException, 
ServiceShutdownException {
@@ -334,26 +337,28 @@ public class JmsEndpointConnection_impl 
     }
   }
 
-  public synchronized void close() throws Exception {
-    if (producer != null) {
-      try {
-        producer.close();
-      } catch (Exception e) {
-        // Ignore we are shutting down
-      }
-    }
-    if (producerSession != null) {
-      try {
-        producerSession.close();
-      } catch (Exception e) {
-        // Ignore we are shutting down
-      }
-      producerSession = null;
-    }
-    if (destination != null) {
-      destination = null;
-    }
-  }
+       public void close() throws Exception {
+               synchronized (recoveryMux) {
+                       if (producer != null) {
+                               try {
+                                       producer.close();
+                               } catch (Exception e) {
+                                       // Ignore we are shutting down
+                               }
+                       }
+                       if (producerSession != null) {
+                               try {
+                                       producerSession.close();
+                               } catch (Exception e) {
+                                       // Ignore we are shutting down
+                               }
+                               producerSession = null;
+                       }
+                       if (destination != null) {
+                               destination = null;
+                       }
+               }
+       }
 
   protected String getEndpoint() {
     return endpoint;


Reply via email to