Author: mriou
Date: Wed Jul 29 23:11:01 2009
New Revision: 799118

URL: http://svn.apache.org/viewvc?rev=799118&view=rev
Log:
ODE-628 Some DBMS (like Ingres) don't support update on queries with join, we 
have to use subqueries instead

Modified:
    
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
    
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
    
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
 Wed Jul 29 23:11:01 2009
@@ -27,6 +27,7 @@
 import org.hibernate.Session;
 import org.hibernate.SessionFactory;
 import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
 
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
@@ -43,83 +44,112 @@
  * managing sessions.
  */
 public class SessionManager {
-  private static final Log __log = LogFactory.getLog(SessionManager.class);
+    private static final Log __log = LogFactory.getLog(SessionManager.class);
 
-  public static final String PROP_GUID = "ode.hibernate.guid";
-  
-  private static final Map<String, TransactionManager> _txManagers =
-    Collections.synchronizedMap(new HashMap<String, TransactionManager>());
-  private static final Map<String, DataSource> _dataSources = 
-    Collections.synchronizedMap(new HashMap<String,DataSource>());
-
-  private final String _uuid = new UUID().toString();
-  private final TransactionManager _txManager;
-  private final SessionFactory _sessionFactory;
-
-  /** Inaccessible constructor. */
-  public SessionManager(Properties env, DataSource ds, TransactionManager tx) 
throws HibernateException {
-    if(tx == null)
-      throw new IllegalArgumentException("Null transaction manager");
-
-    _txManager = tx;
-    _txManagers.put(_uuid,tx);
-    _dataSources.put(_uuid,ds);
-    
-    _sessionFactory = getDefaultConfiguration()
-            .setProperties(env)
-            .setProperty(PROP_GUID, _uuid)
-            .buildSessionFactory();
-  }
-
-  TransactionManager getTransactionManager() {
-    return _txManager;
-  }
-
-  public static void registerTransactionManager(String uuid, 
TransactionManager txm) {
-       _txManagers.put(uuid, txm);
-  }
-
-  /**
-   * Get the current Hibernate Session.
-   */
-  public Session getSession() {
-    return _sessionFactory.getCurrentSession();
-  }
-  
-  /**
-   * Returns a hibernate configuration with hibernate DAO objects added as 
resources.
-   * @return
-   * @throws MappingException
-   */
-  public Configuration getDefaultConfiguration() throws MappingException {
-    return new Configuration()
-            .addClass(HProcess.class)
-            .addClass(HProcessInstance.class)
-            .addClass(HCorrelator.class)
-            .addClass(HCorrelatorMessage.class)
-            .addClass(HCorrelationProperty.class)
-            .addClass(HCorrelatorSelector.class)
-            .addClass(HMessageExchange.class)
-            .addClass(HMessage.class)
-            .addClass(HPartnerLink.class)
-            .addClass(HScope.class)
-            .addClass(HCorrelationSet.class)
-            .addClass(HXmlData.class)
-            .addClass(HVariableProperty.class)
-            .addClass(HBpelEvent.class)
-               .addClass(HFaultData.class)
-               .addClass(HActivityRecovery.class)
-            .addClass(HLargeData.class)
-            .addClass(HMessageExchangeProperty.class);
-  }
-
-  public static TransactionManager getTransactionManager(Properties props) {
-    String guid = props.getProperty(PROP_GUID);
-    return _txManagers.get(guid);
-  }
-
-  public static Connection getConnection(Properties props) throws SQLException 
{
-    String guid = props.getProperty(PROP_GUID);
-    return _dataSources.get(guid).getConnection();
-  }
+    public static final String PROP_GUID = "ode.hibernate.guid";
+
+    private static final Map<String, TransactionManager> _txManagers =
+            Collections.synchronizedMap(new HashMap<String, 
TransactionManager>());
+    private static final Map<String, DataSource> _dataSources =
+            Collections.synchronizedMap(new HashMap<String,DataSource>());
+
+    private static final String[] CANNOT_JOIN_FOR_UPDATE_DIALECTS =
+            {"org.hibernate.dialect.IngresDialect"};
+
+    private final String _uuid = new UUID().toString();
+    private final TransactionManager _txManager;
+    private final SessionFactory _sessionFactory;
+    private boolean _canJoinForUpdate = true;
+
+    /** Inaccessible constructor. */
+    public SessionManager(Properties env, DataSource ds, TransactionManager 
tx) throws HibernateException {
+        if(tx == null)
+            throw new IllegalArgumentException("Null transaction manager");
+
+        _txManager = tx;
+        _txManagers.put(_uuid,tx);
+        _dataSources.put(_uuid,ds);
+
+        _sessionFactory = getDefaultConfiguration()
+                .setProperties(env)
+                .setProperty(PROP_GUID, _uuid)
+                .buildSessionFactory();
+
+        /*
+        Some Hibernate dialects (like IngresDialect) do not support update for 
join.
+        We need to distinguish them and explicitly define subqueries, 
otherwise Hibernate
+        implicitly generates joins which causes problems during update for 
such DBMS.
+        See org.apache.ode.daohib.bpel.CorrelatorDaoImpl for instance.
+        */
+        String currentHibDialect = env.getProperty(Environment.DIALECT);
+        for (String dialect : CANNOT_JOIN_FOR_UPDATE_DIALECTS) {
+            if (dialect.equals(currentHibDialect)) {
+                _canJoinForUpdate = false;
+            }
+        }
+    }
+
+    TransactionManager getTransactionManager() {
+        return _txManager;
+    }
+
+    public static void registerTransactionManager(String uuid, 
TransactionManager txm) {
+        _txManagers.put(uuid, txm);
+    }
+
+    /**
+     * Get the current Hibernate Session.
+     */
+    public Session getSession() {
+        return _sessionFactory.getCurrentSession();
+    }
+
+    /**
+     * Returns flag which shows whether " where .. join ... for update" kind 
of queries can be used (supported
+     * by currently effective {...@link org.hibernate.dialect.Dialect}. If 
it's {...@code false} than sub-query fallback
+     * should be invoked instead.
+     *
+     * @return currently returns false only for {...@link 
org.hibernate.dialect.IngresDialect}
+     */
+    public boolean canJoinForUpdate() {
+        return _canJoinForUpdate;
+    }
+
+
+    /**
+     * Returns a hibernate configuration with hibernate DAO objects added as 
resources.
+     * @return
+     * @throws MappingException
+     */
+    public Configuration getDefaultConfiguration() throws MappingException {
+        return new Configuration()
+                .addClass(HProcess.class)
+                .addClass(HProcessInstance.class)
+                .addClass(HCorrelator.class)
+                .addClass(HCorrelatorMessage.class)
+                .addClass(HCorrelationProperty.class)
+                .addClass(HCorrelatorSelector.class)
+                .addClass(HMessageExchange.class)
+                .addClass(HMessage.class)
+                .addClass(HPartnerLink.class)
+                .addClass(HScope.class)
+                .addClass(HCorrelationSet.class)
+                .addClass(HXmlData.class)
+                .addClass(HVariableProperty.class)
+                .addClass(HBpelEvent.class)
+                .addClass(HFaultData.class)
+                .addClass(HActivityRecovery.class)
+                .addClass(HLargeData.class)
+                .addClass(HMessageExchangeProperty.class);
+    }
+
+    public static TransactionManager getTransactionManager(Properties props) {
+        String guid = props.getProperty(PROP_GUID);
+        return _txManagers.get(guid);
+    }
+
+    public static Connection getConnection(Properties props) throws 
SQLException {
+        String guid = props.getProperty(PROP_GUID);
+        return _dataSources.get(guid).getConnection();
+    }
 }

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
 Wed Jul 29 23:11:01 2009
@@ -132,9 +132,9 @@
         }
         _sessionManager = createSessionManager(properties, _ds, _tm);
     }
-    
+
     protected SessionManager createSessionManager(Properties properties, 
DataSource ds, TransactionManager tm) {
-       return new SessionManager(properties, ds, tm);
+        return new SessionManager(properties, ds, tm);
     }
 
     private static final String DEFAULT_HIBERNATE_DIALECT = 
"org.hibernate.dialect.DerbyDialect";
@@ -160,6 +160,8 @@
         // Oracle 8 and Oracle >8
         HIBERNATE_DIALECTS.put("Apache Derby", new 
DialectFactory.VersionInsensitiveMapper(
                 "org.hibernate.dialect.DerbyDialect"));
+        HIBERNATE_DIALECTS.put("INGRES", new 
DialectFactory.VersionInsensitiveMapper(
+                "org.hibernate.dialect.IngresDialect"));
     }
 
     public void shutdown() {

Modified: 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 Wed Jul 29 23:11:01 2009
@@ -47,6 +47,8 @@
     /** filter for finding a matching selector. */
     private static final String LOCK_SELECTORS = "update from 
HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = 
:processType";
     private static final String FLTR_SELECTORS = "from HCorrelatorSelector as 
hs where hs.processType = :processType and hs.correlator.correlatorId = 
:correlatorId";
+    private static final String FLTR_SELECTORS_SUBQUERY = ("from 
HCorrelatorSelector as hs where hs.processType = :processType and 
hs.correlatorId = " +
+            "(select hc.id from HCorrelator as hc where hc.correlatorId = 
:correlatorId )").intern();
 
     /** Query for removing routes. */
     private static final String QRY_DELSELECTORS = "delete from 
HCorrelatorSelector where groupId = ? and instance = ?";
@@ -62,19 +64,19 @@
     @SuppressWarnings("unchecked")
     public MessageExchangeDAO dequeueMessage(CorrelationKeySet keySet) {
         entering("CorrelatorDaoImpl.dequeueMessage");
-        
+
         MessageExchangeDAO mex = null;
-        
+
         String hdr = "dequeueMessage(" + keySet + "): ";
         __log.debug(hdr);
 
         List<CorrelationKeySet> subSets = keySet.findSubSets();
-        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(), 
-                       generateUnmatchedQuery(subSets));
-       for( int i = 0; i < subSets.size(); i++ ) {
-               qry.setString("s" + i, subSets.get(i).toCanonicalString());
-       }
-        
+        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+                generateUnmatchedQuery(subSets));
+        for( int i = 0; i < subSets.size(); i++ ) {
+            qry.setString("s" + i, subSets.get(i).toCanonicalString());
+        }
+
         // We really should consider the possibility of multiple messages 
matching a criteria.
         // When the message is handled, its not too convenient to attempt to 
determine if the
         // received message conflicts with one already received.
@@ -84,15 +86,15 @@
                 if (__log.isDebugEnabled())
                     __log.debug(hdr + "did not find a MESSAGE entry.");
             } else {
-                   HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
+                HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
                 if (__log.isDebugEnabled())
-                       __log.debug(hdr + "found MESSAGE entry " + 
mcor.getMessageExchange());
-                   mex = new MessageExchangeDaoImpl(_sm, 
mcor.getMessageExchange());
+                    __log.debug(hdr + "found MESSAGE entry " + 
mcor.getMessageExchange());
+                mex = new MessageExchangeDaoImpl(_sm, 
mcor.getMessageExchange());
             }
         } finally {
             Hibernate.close(mcors);
         }
-        
+
         return mex;
     }
 
@@ -106,8 +108,8 @@
 
         String processType = new QName(_hobj.getProcess().getTypeNamespace(), 
_hobj.getProcess().getTypeName()).toString();
         List<CorrelationKeySet> subSets = keySet.findSubSets();
-        
-        Query q = 
getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+
+        Query q = 
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? 
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
         q.setString("processType", processType);
         q.setString("correlatorId", _hobj.getCorrelatorId());
 
@@ -120,7 +122,7 @@
         List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
         for (HCorrelatorSelector selector : 
(List<HCorrelatorSelector>)q.list()) {
             if (selector != null) {
-                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute()); 
+                boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
                 if ("all".equals(selector.getRoute()) ||
                         (isRoutePolicyOne && 
!targets.contains(selector.getInstance()))) {
                     routes.add(new MessageRouteDaoImpl(_sm, selector));
@@ -135,43 +137,43 @@
     }
 
     private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
-       StringBuffer filterQuery = new StringBuffer();
-       
-       if( subSets.size() == 1 ) {
-               filterQuery.append(" where this.correlationKey = :s0");
-       } else if( subSets.size() > 1 ) {
-               filterQuery.append(" where this.correlationKey in(");
-               for( int i = 0; i < subSets.size(); i++ ) {
-                       if( i > 0 ) {
-                               filterQuery.append(", ");
-                       }
-                       filterQuery.append(":s").append(i);
-               }
-               filterQuery.append(")");
-       }
-       
-       return filterQuery.toString();
+        StringBuffer filterQuery = new StringBuffer();
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" where this.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" where this.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
     }
-    
+
     private String generateSelectorQuery(String header, 
List<CorrelationKeySet> subSets) {
-       StringBuffer filterQuery = new StringBuffer(header);
-       
-       if( subSets.size() == 1 ) {
-               filterQuery.append(" and hs.correlationKey = :s0");
-       } else if( subSets.size() > 1 ) {
-               filterQuery.append(" and hs.correlationKey in(");
-               for( int i = 0; i < subSets.size(); i++ ) {
-                       if( i > 0 ) {
-                               filterQuery.append(", ");
-                       }
-                       filterQuery.append(":s").append(i);
-               }
-               filterQuery.append(")");
-       }
-       
-       return filterQuery.toString();
+        StringBuffer filterQuery = new StringBuffer(header);
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" and hs.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" and hs.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
     }
-    
+
     public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet 
correlationKeySet) {
         entering("CorrelatorDaoImpl.enqueueMessage");
         String hdr = "enqueueMessage(mex=" + ((MessageExchangeDaoImpl) 
mex)._hobj.getId() + " keySet="
@@ -187,7 +189,7 @@
             mcor.setMessageExchange((HMessageExchange) 
((MessageExchangeDaoImpl) mex)._hobj);
             mcor.setCorrelationKey(aSubSet.toCanonicalString());
             getSession().save(mcor);
-            
+
             if (__log.isDebugEnabled())
                 __log.debug(hdr + "saved " + mcor);
         }
@@ -223,7 +225,7 @@
         lockQry.setEntity("corr",_hobj);
         lockQry.setReadOnly(true);
         return lockQry.list().isEmpty();
-        
+
     }
 
     public String getCorrelatorId() {
@@ -246,18 +248,18 @@
             __log.debug(hdr + "deleted " + updates + " rows");
     }
 
-     public Collection<CorrelatorMessageDAO> getAllMessages() {
-         Collection<CorrelatorMessageDAO> msgs = new 
ArrayList<CorrelatorMessageDAO>();
-         for (HCorrelatorMessage correlatorMessage : 
_hobj.getMessageCorrelations())
-             msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
-         return msgs;
-     }
-
-     public Collection<MessageRouteDAO> getAllRoutes() {
-         Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-         for (HCorrelatorSelector selector : _hobj.getSelectors())
-             routes.add(new MessageRouteDaoImpl(_sm, selector));
-         return routes;
-     }
+    public Collection<CorrelatorMessageDAO> getAllMessages() {
+        Collection<CorrelatorMessageDAO> msgs = new 
ArrayList<CorrelatorMessageDAO>();
+        for (HCorrelatorMessage correlatorMessage : 
_hobj.getMessageCorrelations())
+            msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
+        return msgs;
+    }
+
+    public Collection<MessageRouteDAO> getAllRoutes() {
+        Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+        for (HCorrelatorSelector selector : _hobj.getSelectors())
+            routes.add(new MessageRouteDaoImpl(_sm, selector));
+        return routes;
+    }
 
 }
\ No newline at end of file


Reply via email to