Author: mriou
Date: Wed Jul 29 23:11:01 2009
New Revision: 799118
URL: http://svn.apache.org/viewvc?rev=799118&view=rev
Log:
ODE-628 Some DBMS (like Ingres) don't support update on queries with join, we
have to use subqueries instead
Modified:
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
Modified:
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
Wed Jul 29 23:11:01 2009
@@ -27,6 +27,7 @@
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
@@ -43,83 +44,112 @@
* managing sessions.
*/
public class SessionManager {
- private static final Log __log = LogFactory.getLog(SessionManager.class);
+ private static final Log __log = LogFactory.getLog(SessionManager.class);
- public static final String PROP_GUID = "ode.hibernate.guid";
-
- private static final Map<String, TransactionManager> _txManagers =
- Collections.synchronizedMap(new HashMap<String, TransactionManager>());
- private static final Map<String, DataSource> _dataSources =
- Collections.synchronizedMap(new HashMap<String,DataSource>());
-
- private final String _uuid = new UUID().toString();
- private final TransactionManager _txManager;
- private final SessionFactory _sessionFactory;
-
- /** Inaccessible constructor. */
- public SessionManager(Properties env, DataSource ds, TransactionManager tx)
throws HibernateException {
- if(tx == null)
- throw new IllegalArgumentException("Null transaction manager");
-
- _txManager = tx;
- _txManagers.put(_uuid,tx);
- _dataSources.put(_uuid,ds);
-
- _sessionFactory = getDefaultConfiguration()
- .setProperties(env)
- .setProperty(PROP_GUID, _uuid)
- .buildSessionFactory();
- }
-
- TransactionManager getTransactionManager() {
- return _txManager;
- }
-
- public static void registerTransactionManager(String uuid,
TransactionManager txm) {
- _txManagers.put(uuid, txm);
- }
-
- /**
- * Get the current Hibernate Session.
- */
- public Session getSession() {
- return _sessionFactory.getCurrentSession();
- }
-
- /**
- * Returns a hibernate configuration with hibernate DAO objects added as
resources.
- * @return
- * @throws MappingException
- */
- public Configuration getDefaultConfiguration() throws MappingException {
- return new Configuration()
- .addClass(HProcess.class)
- .addClass(HProcessInstance.class)
- .addClass(HCorrelator.class)
- .addClass(HCorrelatorMessage.class)
- .addClass(HCorrelationProperty.class)
- .addClass(HCorrelatorSelector.class)
- .addClass(HMessageExchange.class)
- .addClass(HMessage.class)
- .addClass(HPartnerLink.class)
- .addClass(HScope.class)
- .addClass(HCorrelationSet.class)
- .addClass(HXmlData.class)
- .addClass(HVariableProperty.class)
- .addClass(HBpelEvent.class)
- .addClass(HFaultData.class)
- .addClass(HActivityRecovery.class)
- .addClass(HLargeData.class)
- .addClass(HMessageExchangeProperty.class);
- }
-
- public static TransactionManager getTransactionManager(Properties props) {
- String guid = props.getProperty(PROP_GUID);
- return _txManagers.get(guid);
- }
-
- public static Connection getConnection(Properties props) throws SQLException
{
- String guid = props.getProperty(PROP_GUID);
- return _dataSources.get(guid).getConnection();
- }
+ public static final String PROP_GUID = "ode.hibernate.guid";
+
+ private static final Map<String, TransactionManager> _txManagers =
+ Collections.synchronizedMap(new HashMap<String,
TransactionManager>());
+ private static final Map<String, DataSource> _dataSources =
+ Collections.synchronizedMap(new HashMap<String,DataSource>());
+
+ private static final String[] CANNOT_JOIN_FOR_UPDATE_DIALECTS =
+ {"org.hibernate.dialect.IngresDialect"};
+
+ private final String _uuid = new UUID().toString();
+ private final TransactionManager _txManager;
+ private final SessionFactory _sessionFactory;
+ private boolean _canJoinForUpdate = true;
+
+ /** Inaccessible constructor. */
+ public SessionManager(Properties env, DataSource ds, TransactionManager
tx) throws HibernateException {
+ if(tx == null)
+ throw new IllegalArgumentException("Null transaction manager");
+
+ _txManager = tx;
+ _txManagers.put(_uuid,tx);
+ _dataSources.put(_uuid,ds);
+
+ _sessionFactory = getDefaultConfiguration()
+ .setProperties(env)
+ .setProperty(PROP_GUID, _uuid)
+ .buildSessionFactory();
+
+ /*
+ Some Hibernate dialects (like IngresDialect) do not support update for
join.
+ We need to distinguish them and explicitly define subqueries,
otherwise Hibernate
+ implicitly generates joins which causes problems during update for
such DBMS.
+ See org.apache.ode.daohib.bpel.CorrelatorDaoImpl for instance.
+ */
+ String currentHibDialect = env.getProperty(Environment.DIALECT);
+ for (String dialect : CANNOT_JOIN_FOR_UPDATE_DIALECTS) {
+ if (dialect.equals(currentHibDialect)) {
+ _canJoinForUpdate = false;
+ }
+ }
+ }
+
+ TransactionManager getTransactionManager() {
+ return _txManager;
+ }
+
+ public static void registerTransactionManager(String uuid,
TransactionManager txm) {
+ _txManagers.put(uuid, txm);
+ }
+
+ /**
+ * Get the current Hibernate Session.
+ */
+ public Session getSession() {
+ return _sessionFactory.getCurrentSession();
+ }
+
+ /**
+ * Returns flag which shows whether " where .. join ... for update" kind
of queries can be used (supported
+ * by currently effective {...@link org.hibernate.dialect.Dialect}. If
it's {...@code false} than sub-query fallback
+ * should be invoked instead.
+ *
+ * @return currently returns false only for {...@link
org.hibernate.dialect.IngresDialect}
+ */
+ public boolean canJoinForUpdate() {
+ return _canJoinForUpdate;
+ }
+
+
+ /**
+ * Returns a hibernate configuration with hibernate DAO objects added as
resources.
+ * @return
+ * @throws MappingException
+ */
+ public Configuration getDefaultConfiguration() throws MappingException {
+ return new Configuration()
+ .addClass(HProcess.class)
+ .addClass(HProcessInstance.class)
+ .addClass(HCorrelator.class)
+ .addClass(HCorrelatorMessage.class)
+ .addClass(HCorrelationProperty.class)
+ .addClass(HCorrelatorSelector.class)
+ .addClass(HMessageExchange.class)
+ .addClass(HMessage.class)
+ .addClass(HPartnerLink.class)
+ .addClass(HScope.class)
+ .addClass(HCorrelationSet.class)
+ .addClass(HXmlData.class)
+ .addClass(HVariableProperty.class)
+ .addClass(HBpelEvent.class)
+ .addClass(HFaultData.class)
+ .addClass(HActivityRecovery.class)
+ .addClass(HLargeData.class)
+ .addClass(HMessageExchangeProperty.class);
+ }
+
+ public static TransactionManager getTransactionManager(Properties props) {
+ String guid = props.getProperty(PROP_GUID);
+ return _txManagers.get(guid);
+ }
+
+ public static Connection getConnection(Properties props) throws
SQLException {
+ String guid = props.getProperty(PROP_GUID);
+ return _dataSources.get(guid).getConnection();
+ }
}
Modified:
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionFactoryImpl.java
Wed Jul 29 23:11:01 2009
@@ -132,9 +132,9 @@
}
_sessionManager = createSessionManager(properties, _ds, _tm);
}
-
+
protected SessionManager createSessionManager(Properties properties,
DataSource ds, TransactionManager tm) {
- return new SessionManager(properties, ds, tm);
+ return new SessionManager(properties, ds, tm);
}
private static final String DEFAULT_HIBERNATE_DIALECT =
"org.hibernate.dialect.DerbyDialect";
@@ -160,6 +160,8 @@
// Oracle 8 and Oracle >8
HIBERNATE_DIALECTS.put("Apache Derby", new
DialectFactory.VersionInsensitiveMapper(
"org.hibernate.dialect.DerbyDialect"));
+ HIBERNATE_DIALECTS.put("INGRES", new
DialectFactory.VersionInsensitiveMapper(
+ "org.hibernate.dialect.IngresDialect"));
}
public void shutdown() {
Modified:
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=799118&r1=799117&r2=799118&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
Wed Jul 29 23:11:01 2009
@@ -47,6 +47,8 @@
/** filter for finding a matching selector. */
private static final String LOCK_SELECTORS = "update from
HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType =
:processType";
private static final String FLTR_SELECTORS = "from HCorrelatorSelector as
hs where hs.processType = :processType and hs.correlator.correlatorId =
:correlatorId";
+ private static final String FLTR_SELECTORS_SUBQUERY = ("from
HCorrelatorSelector as hs where hs.processType = :processType and
hs.correlatorId = " +
+ "(select hc.id from HCorrelator as hc where hc.correlatorId =
:correlatorId )").intern();
/** Query for removing routes. */
private static final String QRY_DELSELECTORS = "delete from
HCorrelatorSelector where groupId = ? and instance = ?";
@@ -62,19 +64,19 @@
@SuppressWarnings("unchecked")
public MessageExchangeDAO dequeueMessage(CorrelationKeySet keySet) {
entering("CorrelatorDaoImpl.dequeueMessage");
-
+
MessageExchangeDAO mex = null;
-
+
String hdr = "dequeueMessage(" + keySet + "): ";
__log.debug(hdr);
List<CorrelationKeySet> subSets = keySet.findSubSets();
- Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
- generateUnmatchedQuery(subSets));
- for( int i = 0; i < subSets.size(); i++ ) {
- qry.setString("s" + i, subSets.get(i).toCanonicalString());
- }
-
+ Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+ generateUnmatchedQuery(subSets));
+ for( int i = 0; i < subSets.size(); i++ ) {
+ qry.setString("s" + i, subSets.get(i).toCanonicalString());
+ }
+
// We really should consider the possibility of multiple messages
matching a criteria.
// When the message is handled, its not too convenient to attempt to
determine if the
// received message conflicts with one already received.
@@ -84,15 +86,15 @@
if (__log.isDebugEnabled())
__log.debug(hdr + "did not find a MESSAGE entry.");
} else {
- HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
+ HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
if (__log.isDebugEnabled())
- __log.debug(hdr + "found MESSAGE entry " +
mcor.getMessageExchange());
- mex = new MessageExchangeDaoImpl(_sm,
mcor.getMessageExchange());
+ __log.debug(hdr + "found MESSAGE entry " +
mcor.getMessageExchange());
+ mex = new MessageExchangeDaoImpl(_sm,
mcor.getMessageExchange());
}
} finally {
Hibernate.close(mcors);
}
-
+
return mex;
}
@@ -106,8 +108,8 @@
String processType = new QName(_hobj.getProcess().getTypeNamespace(),
_hobj.getProcess().getTypeName()).toString();
List<CorrelationKeySet> subSets = keySet.findSubSets();
-
- Query q =
getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+
+ Query q =
getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ?
FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
q.setString("processType", processType);
q.setString("correlatorId", _hobj.getCorrelatorId());
@@ -120,7 +122,7 @@
List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
for (HCorrelatorSelector selector :
(List<HCorrelatorSelector>)q.list()) {
if (selector != null) {
- boolean isRoutePolicyOne = selector.getRoute() == null ||
"one".equals(selector.getRoute());
+ boolean isRoutePolicyOne = selector.getRoute() == null ||
"one".equals(selector.getRoute());
if ("all".equals(selector.getRoute()) ||
(isRoutePolicyOne &&
!targets.contains(selector.getInstance()))) {
routes.add(new MessageRouteDaoImpl(_sm, selector));
@@ -135,43 +137,43 @@
}
private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
- StringBuffer filterQuery = new StringBuffer();
-
- if( subSets.size() == 1 ) {
- filterQuery.append(" where this.correlationKey = :s0");
- } else if( subSets.size() > 1 ) {
- filterQuery.append(" where this.correlationKey in(");
- for( int i = 0; i < subSets.size(); i++ ) {
- if( i > 0 ) {
- filterQuery.append(", ");
- }
- filterQuery.append(":s").append(i);
- }
- filterQuery.append(")");
- }
-
- return filterQuery.toString();
+ StringBuffer filterQuery = new StringBuffer();
+
+ if( subSets.size() == 1 ) {
+ filterQuery.append(" where this.correlationKey = :s0");
+ } else if( subSets.size() > 1 ) {
+ filterQuery.append(" where this.correlationKey in(");
+ for( int i = 0; i < subSets.size(); i++ ) {
+ if( i > 0 ) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
}
-
+
private String generateSelectorQuery(String header,
List<CorrelationKeySet> subSets) {
- StringBuffer filterQuery = new StringBuffer(header);
-
- if( subSets.size() == 1 ) {
- filterQuery.append(" and hs.correlationKey = :s0");
- } else if( subSets.size() > 1 ) {
- filterQuery.append(" and hs.correlationKey in(");
- for( int i = 0; i < subSets.size(); i++ ) {
- if( i > 0 ) {
- filterQuery.append(", ");
- }
- filterQuery.append(":s").append(i);
- }
- filterQuery.append(")");
- }
-
- return filterQuery.toString();
+ StringBuffer filterQuery = new StringBuffer(header);
+
+ if( subSets.size() == 1 ) {
+ filterQuery.append(" and hs.correlationKey = :s0");
+ } else if( subSets.size() > 1 ) {
+ filterQuery.append(" and hs.correlationKey in(");
+ for( int i = 0; i < subSets.size(); i++ ) {
+ if( i > 0 ) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
}
-
+
public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet
correlationKeySet) {
entering("CorrelatorDaoImpl.enqueueMessage");
String hdr = "enqueueMessage(mex=" + ((MessageExchangeDaoImpl)
mex)._hobj.getId() + " keySet="
@@ -187,7 +189,7 @@
mcor.setMessageExchange((HMessageExchange)
((MessageExchangeDaoImpl) mex)._hobj);
mcor.setCorrelationKey(aSubSet.toCanonicalString());
getSession().save(mcor);
-
+
if (__log.isDebugEnabled())
__log.debug(hdr + "saved " + mcor);
}
@@ -223,7 +225,7 @@
lockQry.setEntity("corr",_hobj);
lockQry.setReadOnly(true);
return lockQry.list().isEmpty();
-
+
}
public String getCorrelatorId() {
@@ -246,18 +248,18 @@
__log.debug(hdr + "deleted " + updates + " rows");
}
- public Collection<CorrelatorMessageDAO> getAllMessages() {
- Collection<CorrelatorMessageDAO> msgs = new
ArrayList<CorrelatorMessageDAO>();
- for (HCorrelatorMessage correlatorMessage :
_hobj.getMessageCorrelations())
- msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
- return msgs;
- }
-
- public Collection<MessageRouteDAO> getAllRoutes() {
- Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
- for (HCorrelatorSelector selector : _hobj.getSelectors())
- routes.add(new MessageRouteDaoImpl(_sm, selector));
- return routes;
- }
+ public Collection<CorrelatorMessageDAO> getAllMessages() {
+ Collection<CorrelatorMessageDAO> msgs = new
ArrayList<CorrelatorMessageDAO>();
+ for (HCorrelatorMessage correlatorMessage :
_hobj.getMessageCorrelations())
+ msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
+ return msgs;
+ }
+
+ public Collection<MessageRouteDAO> getAllRoutes() {
+ Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+ for (HCorrelatorSelector selector : _hobj.getSelectors())
+ routes.add(new MessageRouteDaoImpl(_sm, selector));
+ return routes;
+ }
}
\ No newline at end of file