Author: rr
Date: Mon Oct 19 16:38:56 2009
New Revision: 826717
URL: http://svn.apache.org/viewvc?rev=826717&view=rev
Log:
ODE-685: fixes for OpenJPA
Modified:
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
ode/branches/APACHE_ODE_1.X/jbi/src/test/jbi/ode-jbi.properties
ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties
Modified:
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java?rev=826717&r1=826716&r2=826717&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionFactoryImpl.java
Mon Oct 19 16:38:56 2009
@@ -99,7 +99,7 @@
propMap.put("openjpa.ManagedRuntime", new JpaTxMgrProvider(_tm));
propMap.put("openjpa.ConnectionFactory", _ds);
propMap.put("openjpa.ConnectionFactoryMode", "managed");
- propMap.put("openjpa.FlushBeforeQueries", "false");
+// propMap.put("openjpa.FlushBeforeQueries", "false");
propMap.put("openjpa.FetchBatchSize", 1000);
propMap.put("openjpa.jdbc.TransactionIsolation", "read-committed");
@@ -141,4 +141,4 @@
public DataSource getDataSource() {
return _ds;
}
-}
\ No newline at end of file
+}
Modified:
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java?rev=826717&r1=826716&r2=826717&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
Mon Oct 19 16:38:56 2009
@@ -19,6 +19,8 @@
package org.apache.ode.dao.jpa;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.dao.*;
@@ -29,42 +31,49 @@
import java.util.List;
@Entity
-...@table(name="ODE_CORRELATOR")
-...@namedqueries({
- @NamedQuery(name=CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS,
query="delete from CorrelatorDAOImpl as c where c._process = :process")
-})
+...@table(name = "ODE_CORRELATOR")
+...@namedqueries( { @NamedQuery(name =
CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS, query = "delete from
CorrelatorDAOImpl as c where c._process = :process") })
public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
- public final static String DELETE_CORRELATORS_BY_PROCESS =
"DELETE_CORRELATORS_BY_PROCESS";
- private final static String ROUTE_BY_CKEY_HEADER = "select route from
MessageRouteDAOImpl as route where route._correlator._process._processType =
:ptype and route._correlator._correlatorKey = :corrkey";
-
- @Id @Column(name="CORRELATOR_ID")
- @GeneratedValue(strategy=GenerationType.AUTO)
+ private static Log __log = LogFactory.getLog(CorrelatorDAOImpl.class);
+ public final static String DELETE_CORRELATORS_BY_PROCESS =
"DELETE_CORRELATORS_BY_PROCESS";
+ private final static String ROUTE_BY_CKEY_HEADER = "select route from
MessageRouteDAOImpl as route where route._correlator._process._processType =
:ptype and route._correlator._correlatorKey = :corrkey";
+
+ @Id
+ @Column(name = "CORRELATOR_ID")
+ @GeneratedValue(strategy = GenerationType.AUTO)
@SuppressWarnings("unused")
private Long _correlatorId;
- @Basic @Column(name="CORRELATOR_KEY")
+ @Basic
+ @Column(name = "CORRELATOR_KEY")
private String _correlatorKey;
-
@OneToMany(targetEntity=MessageRouteDAOImpl.class,mappedBy="_correlator",fetch=FetchType.EAGER,cascade={CascadeType.MERGE,
CascadeType.PERSIST, CascadeType.REFRESH})
+ @OneToMany(targetEntity = MessageRouteDAOImpl.class, mappedBy =
"_correlator", fetch = FetchType.EAGER, cascade = { CascadeType.MERGE,
CascadeType.PERSIST, CascadeType.REFRESH })
private Collection<MessageRouteDAOImpl> _routes = new
ArrayList<MessageRouteDAOImpl>();
-
@OneToMany(targetEntity=MessageExchangeDAOImpl.class,mappedBy="_correlator",fetch=FetchType.LAZY,cascade={CascadeType.MERGE,
CascadeType.PERSIST, CascadeType.REFRESH})
+ @OneToMany(targetEntity = MessageExchangeDAOImpl.class, mappedBy =
"_correlator", fetch = FetchType.LAZY, cascade = { CascadeType.MERGE,
CascadeType.PERSIST, CascadeType.REFRESH })
private Collection<MessageExchangeDAOImpl> _exchanges = new
ArrayList<MessageExchangeDAOImpl>();
- @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST})
@Column(name="PROC_ID")
+ @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.PERSIST })
+ @Column(name = "PROC_ID")
private ProcessDAOImpl _process;
- public CorrelatorDAOImpl(){}
+ public CorrelatorDAOImpl() {
+ }
+
public CorrelatorDAOImpl(String correlatorKey, ProcessDAOImpl process) {
_correlatorKey = correlatorKey;
_process = process;
}
public void addRoute(String routeGroupId, ProcessInstanceDAO target, int
index, CorrelationKeySet correlationKeySet, String routePolicy) {
- MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKeySet,
- routeGroupId, index, (ProcessInstanceDAOImpl) target, this,
routePolicy);
+ if (__log.isDebugEnabled()) {
+ __log.debug("addRoute " + routeGroupId + " " + target + " " +
index + " " + correlationKeySet + " " + routePolicy);
+ }
+ MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKeySet,
routeGroupId, index, (ProcessInstanceDAOImpl) target, this, routePolicy);
_routes.add(mr);
+ getEM().flush();
}
public MessageExchangeDAO dequeueMessage(CorrelationKeySet
correlationKeySet) {
- // TODO: this thing does not seem to be scalable: loading up based on a
correlator???
- for (Iterator<MessageExchangeDAOImpl> itr=_exchanges.iterator();
itr.hasNext();){
+ // TODO: this thing does not seem to be scalable: loading up based on
a correlator???
+ for (Iterator<MessageExchangeDAOImpl> itr = _exchanges.iterator();
itr.hasNext();) {
MessageExchangeDAOImpl mex = itr.next();
if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet,
false)) {
itr.remove();
@@ -74,8 +83,7 @@
return null;
}
- public void enqueueMessage(MessageExchangeDAO mex,
- CorrelationKeySet correlationKeySet) {
+ public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet
correlationKeySet) {
MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex;
mexImpl.setCorrelationKeySet(correlationKeySet);
_exchanges.add(mexImpl);
@@ -88,52 +96,61 @@
@SuppressWarnings("unchecked")
public List<MessageRouteDAO> findRoute(CorrelationKeySet
correlationKeySet) {
- List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
- Query qry =
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute " + correlationKeySet);
+ }
+ List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+ Query qry =
getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
qry.setParameter("ptype", _process.getType().toString());
qry.setParameter("corrkey", _correlatorKey);
- for( int i = 0; i < subSets.size(); i++ ) {
- qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
- }
-
+ for (int i = 0; i < subSets.size(); i++) {
+ qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+ }
+
List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>)
qry.getResultList();
if (candidateRoutes.size() > 0) {
- List<MessageRouteDAO> matchingRoutes = new
ArrayList<MessageRouteDAO>();
- boolean routed = false;
+ List<MessageRouteDAO> matchingRoutes = new
ArrayList<MessageRouteDAO>();
+ boolean routed = false;
for (int i = 0; i < candidateRoutes.size(); i++) {
- MessageRouteDAO route = candidateRoutes.get(i);
- if ("all".equals(route.getRoute())) {
- matchingRoutes.add(route);
- } else {
- if (!routed) {
- matchingRoutes.add(route);
- }
- routed = true;
- }
+ MessageRouteDAO route = candidateRoutes.get(i);
+ if ("all".equals(route.getRoute())) {
+ matchingRoutes.add(route);
+ } else {
+ if (!routed) {
+ matchingRoutes.add(route);
+ }
+ routed = true;
+ }
+ }
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute found " + matchingRoutes);
}
return matchingRoutes;
} else {
- return null;
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute found nothing");
+ }
+ return null;
}
}
private String generateSelectorQuery(String header,
List<CorrelationKeySet> subSets) {
- StringBuffer filterQuery = new StringBuffer(header);
-
- if( subSets.size() == 1 ) {
- filterQuery.append(" and route._correlationKey = :s0");
- } else if( subSets.size() > 1 ) {
- filterQuery.append(" and route._correlationKey in(");
- for( int i = 0; i < subSets.size(); i++ ) {
- if( i > 0 ) {
- filterQuery.append(", ");
- }
- filterQuery.append(":s").append(i);
- }
- filterQuery.append(")");
- }
-
- return filterQuery.toString();
+ StringBuffer filterQuery = new StringBuffer(header);
+
+ if (subSets.size() == 1) {
+ filterQuery.append(" and route._correlationKey = :s0");
+ } else if (subSets.size() > 1) {
+ filterQuery.append(" and route._correlationKey in(");
+ for (int i = 0; i < subSets.size(); i++) {
+ if (i > 0) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
}
public String getCorrelatorId() {
@@ -146,14 +163,20 @@
public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
// remove route across all correlators of the process
- ((ProcessInstanceDAOImpl)target).removeRoutes(routeGroupId);
+ ((ProcessInstanceDAOImpl) target).removeRoutes(routeGroupId);
}
void removeLocalRoutes(String routeGroupId, ProcessInstanceDAO target) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("removeLocalRoutes " + routeGroupId);
+ }
boolean flush = false;
- for (Iterator<MessageRouteDAOImpl> itr=_routes.iterator();
itr.hasNext(); ) {
+ for (Iterator<MessageRouteDAOImpl> itr = _routes.iterator();
itr.hasNext();) {
MessageRouteDAOImpl mr = itr.next();
- if ( mr.getGroupId().equals(routeGroupId) &&
mr.getTargetInstance().equals(target)) {
+ if (mr.getGroupId().equals(routeGroupId) &&
mr.getTargetInstance().equals(target)) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("removing " + mr.getCorrelationKey() + " " +
mr.getIndex() + " " + mr.getRoute());
+ }
itr.remove();
getEM().remove(mr);
flush = true;
Modified:
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java?rev=826717&r1=826716&r2=826717&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
Mon Oct 19 16:38:56 2009
@@ -19,7 +19,8 @@
package org.apache.ode.dao.jpa;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.utils.DOMUtils;
@@ -42,71 +43,76 @@
import javax.xml.namespace.QName;
@Entity
-...@table(name="ODE_MESSAGE")
-...@namedqueries({
- @NamedQuery(name=MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS,
query="delete from MessageDAOImpl as m where m._messageExchange._process =
:process")
-})
+...@table(name = "ODE_MESSAGE")
+...@namedqueries( { @NamedQuery(name =
MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS, query = "delete from MessageDAOImpl
as m where m._messageExchange._process = :process") })
public class MessageDAOImpl implements MessageDAO {
- public final static String DELETE_MESSAGES_BY_PROCESS =
"DELETE_MESSAGES_BY_PROCESS";
-
- @Id @Column(name="MESSAGE_ID")
- @GeneratedValue(strategy=GenerationType.AUTO)
- @SuppressWarnings("unused")
- private Long _id;
- @Basic @Column(name="TYPE")
+ private static Log __log = LogFactory.getLog(MessageDAOImpl.class);
+ public final static String DELETE_MESSAGES_BY_PROCESS =
"DELETE_MESSAGES_BY_PROCESS";
+
+ @Id
+ @Column(name = "MESSAGE_ID")
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _id;
+ @Basic
+ @Column(name = "TYPE")
private String _type;
- @Lob @Column(name="DATA")
+ @Lob
+ @Column(name = "DATA")
private String _data;
- @Lob @Column(name="HEADER")
+ @Lob
+ @Column(name = "HEADER")
private String _header;
- @Transient
- private Element _element;
- @Transient
- private Element _headerElement;
- @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL})
@Column(name="MESSAGE_EXCHANGE_ID")
- private MessageExchangeDAOImpl _messageExchange;
-
- public MessageDAOImpl() {
- }
-
- public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
- _type = type.toString();
- _messageExchange = me;
- }
-
- public Element getData() {
- if ( _element == null && _data != null && !"".equals(_data)) {
- try {
- _element = DOMUtils.stringToDOM(_data);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return _element;
- }
+ @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.ALL })
+ @Column(name = "MESSAGE_EXCHANGE_ID")
+ private MessageExchangeDAOImpl _messageExchange;
+
+ public MessageDAOImpl() {
+ }
+
+ public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
+ _type = type.toString();
+ _messageExchange = me;
+ }
- public void setData(Element value) {
- if (value == null) return;
+ public Element getData() {
+ if (__log.isDebugEnabled()) {
+ __log.debug("getData " + _id + " " + _data);
+ }
+ try {
+ return _data == null ? null : DOMUtils.stringToDOM(_data);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setData(Element value) {
+ if (value == null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("setData " + _id + " null");
+ }
+ return;
+ }
_data = DOMUtils.domToString(value);
- _element = value;
- }
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("setData " + _id + " " + _data);
+ }
+ }
- public Element getHeader() {
- if ( _headerElement == null && _header != null &&
!"".equals(_header)) {
- try {
- _headerElement = DOMUtils.stringToDOM(_header);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return _headerElement;
- }
+ public Element getHeader() {
+ try {
+ return _header == null ? null : DOMUtils.stringToDOM(_header);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- public void setHeader(Element value) {
- if (value == null) return;
+ public void setHeader(Element value) {
+ if (value == null)
+ return;
_header = DOMUtils.domToString(value);
- _headerElement = value;
- }
+ }
public MessageExchangeDAO getMessageExchange() {
return _messageExchange;
@@ -116,8 +122,8 @@
return _type == null ? null : QName.valueOf(_type);
}
- public void setType(QName type) {
- _type = type.toString();
- }
+ public void setType(QName type) {
+ _type = type.toString();
+ }
}
Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/jbi/ode-jbi.properties
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/jbi/ode-jbi.properties?rev=826717&r1=826716&r2=826717&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/jbi/ode-jbi.properties (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/jbi/ode-jbi.properties Mon Oct 19
16:38:56 2009
@@ -52,7 +52,7 @@
# Embedded Database Name [String]
# Name of the embedded Derby database. This is only used if the
# "ode-jbi.db.mode" property is set to "EMBEDDED".
-ode-jbi.db.emb.name=hibdb
+#ode-jbi.db.emb.name=hibdb
#ode-jbi.db.emb.name=jpadb
# Internal Database Configuration
@@ -62,7 +62,7 @@
# DAO Connection Factory class.
# uncomment the following for hibernate.
-ode-jbi.dao.factory=org.apache.ode.daohib.bpel.BpelDAOConnectionFactoryImpl
+#ode-jbi.dao.factory=org.apache.ode.daohib.bpel.BpelDAOConnectionFactoryImpl
# Class name of the message mapper that should be used to convert message
# between ODE / NMS.
@@ -73,6 +73,6 @@
# BPEL Event Listener
# Uncomment the following for a debug output of BPEL navigation events.
-#ode-jbi.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
+ode-jbi.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
#debugeventlistener.dumpToStdOut=on/off
Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties?rev=826717&r1=826716&r2=826717&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties
(original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties Mon Oct
19 16:38:56 2009
@@ -34,8 +34,12 @@
log4j.appender.stdout.layout.conversionpattern=%d{mm...@hh:mm:ss} %-5p
(%13F:%L) %3x - %m%n
log4j.category.org.apache.servicemix.jbi.nmr.flow.seda=DEBUG
-log4j.category.org.apache.ode.jbi.EndpointReferenceContextImpl=DEBUG
+#log4j.category.org.apache.ode.jbi.msgmap=TRACE
+#log4j.category.org.apache.ode.jbi.EndpointReferenceContextImpl=DEBUG
#log4j.category.org.apache.ode.bpel=DEBUG
+#log4j.category.org.apache.ode.dao.jpa.CorrelatorDAOImpl=TRACE
+#log4j.category.org.apache.ode.dao.jpa.MessageDAOImpl=TRACE
log4j.category.httpclient=DEBUG
log4j.category.httpclient.Wire=DEBUG
+#log4j.category.org.apache.ode.bpel.iapi.BpelEventListener=DEBUG