JUDDI-241 get/set replication cfg now works for all sub elements. notify changes works, however getChangeRecords fails when called from within the tomcat container.
Project: http://git-wip-us.apache.org/repos/asf/juddi/repo Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/22a846dd Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/22a846dd Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/22a846dd Branch: refs/heads/JUDDI-241 Commit: 22a846ddb5fe2c79b23967a81899c214312fc2be Parents: 03dce36 Author: Alex <[email protected]> Authored: Sun Nov 23 19:52:30 2014 -0500 Committer: Alex <[email protected]> Committed: Sun Nov 23 19:52:30 2014 -0500 ---------------------------------------------------------------------- .../src/test/resources/META-INF/persistence.xml | 3 +- .../juddi/api/impl/AuthenticatedService.java | 5 + .../org/apache/juddi/api/impl/JUDDIApiImpl.java | 83 +- .../juddi/api/impl/UDDIReplicationImpl.java | 1088 ++++++++++-------- .../apache/juddi/mapping/MappingApiToModel.java | 333 +++--- .../apache/juddi/mapping/MappingModelToApi.java | 27 +- .../java/org/apache/juddi/model/Contact.java | 11 + .../org/apache/juddi/model/ControlMessage.java | 30 + .../main/java/org/apache/juddi/model/Edge.java | 41 +- .../juddi/model/EdgeReceiverAlternate.java | 68 ++ .../main/java/org/apache/juddi/model/Node.java | 328 +++--- .../java/org/apache/juddi/model/Operator.java | 31 +- .../juddi/model/ReplicationConfiguration.java | 46 +- .../model/ReplicationConfigurationNode.java | 72 ++ .../juddi/replication/ReplicationNotifier.java | 84 +- .../apache/juddi/validation/ValidateNode.java | 5 +- .../juddi/validation/ValidateReplication.java | 49 +- .../src/main/resources/messages.properties | 4 +- .../juddi/api/impl/API_160_ReplicationTest.java | 138 ++- .../apache/juddi/api/runtime/CLIServerTest.java | 119 ++ .../apache/juddi/api/runtime/juddiTestimpl.java | 188 +++ .../apache/juddi/api/runtime/replicantImpl.java | 70 ++ .../replication/ReplicationNotifierTest.java | 5 - .../src/test/resources/META-INF/persistence.xml | 3 + juddi-core/src/test/resources/META-INF/uddi.xml | 24 + .../src/main/resources/META-INF/persistence.xml | 3 +- .../org/apache/juddi/samples/EntryPoint.java | 89 +- .../apache/juddi/samples/JuddiAdminService.java | 237 +++- .../apache/juddi/samples/UddiReplication.java | 4 +- .../resources/META-INF/simple-publish-uddi.xml | 27 +- .../src/test/resources/META-INF/persistence.xml | 3 +- .../WEB-INF/classes/META-INF/persistence.xml | 2 + .../WEB-INF/classes/META-INF/persistence.xml | 2 + .../WEB-INF/classes/META-INF/persistence.xml | 2 + .../WEB-INF/classes/META-INF/persistence.xml | 3 + pom.xml | 2 +- uddi-ws/pom.xml | 73 +- .../java/org/apache/juddi/repl_v3/EdgeExt.java | 36 - .../main/java/org/uddi/api_v3/GetAuthToken.java | 128 ++- .../org/uddi/repl_v3/CommunicationGraph.java | 1 + .../main/java/org/uddi/repl_v3/Operator.java | 300 ++--- .../uddi/repl_v3/ReplicationConfiguration.java | 17 +- .../v3_service/UDDIReplicationPortType.java | 38 +- .../juddi/api_v3/GetPublisherDetailTest.java | 33 + 44 files changed, 2516 insertions(+), 1339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml index 7bf55b0..f42c65f 100644 --- a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml +++ b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml @@ -78,7 +78,8 @@ <class>org.apache.juddi.model.ReplicationConfiguration</class> <class>org.apache.juddi.model.Edge</class> <class>org.apache.juddi.model.ControlMessage</class> - <class>org.apache.juddi.model.ReplicationMessage</class> + <class>org.apache.juddi.model.ReplicationConfigurationNode</class> + <class>org.apache.juddi.model.EdgeReceiverAlternate</class> <properties> <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(SchemaAction='dropDB,add')"/> http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java index d69afbf..767cbee 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java @@ -50,10 +50,15 @@ public abstract class AuthenticatedService { public static final int AUTHTOKEN_RETIRED = 0; static final Log logger = LogFactory.getLog(AuthenticatedService.class); protected String node = "UNDEFINED_NODE_NAME"; + protected String baseUrlSSL="UNDEFINED"; + protected String baseUrl="UNDEFINED"; public AuthenticatedService(){ try { node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME"); + node=node.trim(); + baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL_SECURE, Property.DEFAULT_BASE_URL_SECURE); + baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL, Property.DEFAULT_BASE_URL); } catch (ConfigurationException ex) { logger.fatal(null, ex); } http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java index 268616e..dd201fe 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java @@ -19,6 +19,7 @@ package org.apache.juddi.api.impl; import java.io.StringWriter; import java.math.BigInteger; import java.rmi.RemoteException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -30,6 +31,7 @@ import javax.jws.WebService; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import javax.persistence.Query; +import javax.xml.bind.JAXB; import javax.xml.bind.JAXBContext; import javax.xml.bind.Marshaller; import javax.xml.ws.Holder; @@ -73,7 +75,6 @@ import org.apache.juddi.model.ClientSubscriptionInfo; import org.apache.juddi.model.Node; import org.apache.juddi.model.Publisher; import org.apache.juddi.model.ReplicationConfiguration; -import org.apache.juddi.model.SubscriptionMatch; import org.apache.juddi.model.Tmodel; import org.apache.juddi.model.UddiEntityPublisher; import org.apache.juddi.subscription.NotificationList; @@ -89,19 +90,24 @@ import org.apache.juddi.validation.ValidateNode; import org.apache.juddi.validation.ValidatePublish; import org.apache.juddi.validation.ValidatePublisher; import org.apache.juddi.validation.ValidateReplication; -import org.apache.juddi.validation.ValidateSubscription; import org.uddi.api_v3.AuthToken; import org.uddi.api_v3.BusinessInfo; import org.uddi.api_v3.BusinessInfos; +import org.uddi.api_v3.Contact; import org.uddi.api_v3.DeleteTModel; import org.uddi.api_v3.DispositionReport; import org.uddi.api_v3.GetRegisteredInfo; import org.uddi.api_v3.InfoSelection; +import org.uddi.api_v3.KeyType; +import org.uddi.api_v3.PersonName; import org.uddi.api_v3.RegisteredInfo; import org.uddi.api_v3.Result; import org.uddi.api_v3.SaveTModel; import org.uddi.api_v3.TModelInfo; import org.uddi.api_v3.TModelInfos; +import org.uddi.repl_v3.CommunicationGraph; +import org.uddi.repl_v3.Operator; +import org.uddi.repl_v3.OperatorStatusType; import org.uddi.sub_v3.GetSubscriptionResults; import org.uddi.sub_v3.Subscription; import org.uddi.sub_v3.SubscriptionResultsList; @@ -119,7 +125,7 @@ import org.uddi.v3_service.UDDISubscriptionPortType; */ @WebService(serviceName = "JUDDIApiService", endpointInterface = "org.apache.juddi.v3_service.JUDDIApiPortType", - targetNamespace = "urn:juddi-apache-org:v3_service") + targetNamespace = "urn:juddi-apache-org:v3_service", wsdlLocation = "classpath:/juddi_api_v1.wsdl") public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortType { private Log log = LogFactory.getLog(this.getClass()); @@ -786,6 +792,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy * @throws RemoteException */ @SuppressWarnings("unchecked") + @Override public SyncSubscriptionDetail invokeSyncSubscription( SyncSubscription body) throws DispositionReportFaultMessage, RemoteException { @@ -1273,30 +1280,42 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy if (!((Publisher) publisher).isAdmin()) { throw new UserMismatchException(new ErrorMessage("errors.AdminReqd")); } - new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration,em); + new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration, em, node); org.apache.juddi.model.ReplicationConfiguration model = null; try { model = (ReplicationConfiguration) em.createQuery("select c FROM ReplicationConfiguration c order by c.serialNumber desc").getSingleResult(); } catch (Exception ex) { } + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ"); if (model == null) { + //this is a brand new configuration model = new ReplicationConfiguration(); - MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em); + MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model,em); model.setSerialNumber(System.currentTimeMillis()); + model.setTimeOfConfigurationUpdate(sdf.format(new Date())); em.persist(model); + //if (newReplicationNode(model)){ + //tell the replication notifier to start transfering with + //the first change record + //} } else { - //long oldid = model.getSerialNumber(); - em.remove(model); + //a config exists, remove it, add the new one + //spec doesn't appear to mention if recording a change history on the config is required + //assuming not. + //em.remove(model); model = new ReplicationConfiguration(); MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em); model.setSerialNumber(System.currentTimeMillis()); - em.persist(model); + + model.setTimeOfConfigurationUpdate(sdf.format(new Date())); + em.merge(model); } tx.commit(); + //UDDIReplicationImpl.notifyConfigurationChange(replicationConfiguration); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES, QueryStatus.SUCCESS, procTime); @@ -1305,14 +1324,22 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES, QueryStatus.FAILED, procTime); throw drfm; - } finally { + } catch (Exception ex){ + logger.error(ex,ex); + JAXB.marshal(replicationConfiguration, System.out); + throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage())); + } + finally { if (tx.isActive()) { tx.rollback(); } em.close(); } + DispositionReport d = new DispositionReport(); + Result res = new Result(); - return new DispositionReport(); + d.getResult().add(res); + return d; } @Override @@ -1335,7 +1362,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy sql.toString(); Query qry = em.createQuery(sql.toString()); qry.setMaxResults(1); - + org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult(); MappingModelToApi.mapReplicationConfiguration(resultList, r); tx.commit(); @@ -1347,15 +1374,36 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy serviceCounter.update(JUDDIQuery.GET_ALL_NODES, QueryStatus.FAILED, procTime); throw drfm; - } catch (Exception ex){ - //possible that there is no config to return - r.setCommunicationGraph(null); - logger.warn("Error caught, is there a replication config is avaiable?", ex); + } catch (Exception ex) { + //possible that there is no config to return + r.setCommunicationGraph(new CommunicationGraph()); + Operator op = new Operator(); + op.setOperatorNodeID(node); + op.setSoapReplicationURL(baseUrlSSL + "/services/replication"); + //TODO lookup from the root business + + op.getContact().add(new Contact()); + op.getContact().get(0).getPersonName().add(new PersonName("Unknown", null)); + op.setOperatorStatus(OperatorStatusType.NORMAL); + + r.getOperator().add(op); + r.getCommunicationGraph().getNode().add(node); + r.getCommunicationGraph().getControlledMessage().add("*"); + logger.warn("Error caught, is there a replication config is avaiable? Returning a default config (no replication): " + ex.getMessage()); + logger.debug("Error caught, is there a replication config is avaiable? Returning a default config (no replication): ", ex); long procTime = System.currentTimeMillis() - startTime; + r.setSerialNumber(0); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ"); + r.setTimeOfConfigurationUpdate(sdf.format(new Date())); + r.setRegistryContact(new org.uddi.repl_v3.ReplicationConfiguration.RegistryContact()); + //TODO pull from root business + r.getRegistryContact().setContact(new Contact()); + r.getRegistryContact().getContact().getPersonName().add(new PersonName("Unknown", null)); + serviceCounter.update(JUDDIQuery.GET_REPLICATION_NODES, QueryStatus.FAILED, procTime); - - }finally { + + } finally { if (tx.isActive()) { tx.rollback(); } @@ -1364,6 +1412,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy r.setMaximumTimeToGetChanges(BigInteger.ONE); r.setMaximumTimeToSyncRegistry(BigInteger.ONE); + JAXB.marshal(r, System.out); return r; } http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java index 15ed7a8..3971e6b 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java @@ -28,11 +28,19 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.logging.Level; import java.util.logging.Logger; +import javax.jws.WebMethod; +import javax.jws.WebParam; +import javax.jws.WebResult; +import javax.jws.WebService; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import javax.persistence.Query; +import javax.xml.bind.JAXB; +import javax.xml.bind.annotation.XmlSeeAlso; import javax.xml.datatype.DatatypeConfigurationException; import javax.xml.ws.BindingProvider; +import javax.xml.ws.RequestWrapper; +import javax.xml.ws.ResponseWrapper; import org.apache.commons.configuration.ConfigurationException; import static org.apache.juddi.api.impl.AuthenticatedService.logger; import org.apache.juddi.api.util.QueryStatus; @@ -45,6 +53,7 @@ import org.apache.juddi.mapping.MappingModelToApi; import org.apache.juddi.model.BusinessEntity; import org.apache.juddi.model.BusinessService; import org.apache.juddi.model.Node; +import org.apache.juddi.model.Operator; import org.apache.juddi.model.Tmodel; import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges; import org.apache.juddi.v3.client.UDDIService; @@ -75,562 +84,625 @@ import org.uddi.v3_service.UDDIReplicationPortType; * <li>do_ping</li> * <li>get_highWaterMarks</li> * - * @author <a href="mailto:[email protected]">Alex O'Ree<a/> + * @author <a href="mailto:[email protected]">Alex O'Ree</a> */ +@WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:repl_v3_portType", + endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType") +@XmlSeeAlso({ + org.uddi.custody_v3.ObjectFactory.class, + org.uddi.repl_v3.ObjectFactory.class, + org.uddi.subr_v3.ObjectFactory.class, + org.uddi.api_v3.ObjectFactory.class, + org.uddi.vscache_v3.ObjectFactory.class, + org.uddi.vs_v3.ObjectFactory.class, + org.uddi.sub_v3.ObjectFactory.class, + org.w3._2000._09.xmldsig_.ObjectFactory.class, + org.uddi.policy_v3.ObjectFactory.class, + org.uddi.policy_v3_instanceparms.ObjectFactory.class +}) public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType { - - private UDDIServiceCounter serviceCounter; - - private static PullTimerTask timer = null; - private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default - private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default - - private static UDDIPublicationImpl pub = null; - - public UDDIReplicationImpl() { - super(); - if (pub == null) { - pub = new UDDIPublicationImpl(); - } - serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class); - Init(); - try { - startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default - interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default - } catch (ConfigurationException ex) { - logger.fatal(ex); - } - - } - - private synchronized void Init() { - if (queue == null) { - queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>(); + + static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) { + + //if the config is different } - timer = new PullTimerTask(); - - } - - private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) { - if (changesAlreadySeen != null) { - for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { - if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID()) - && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) { - return true; + + private UDDIServiceCounter serviceCounter; + + private static PullTimerTask timer = null; + private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default + private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default + + private static UDDIPublicationImpl pub = null; + + public UDDIReplicationImpl() { + super(); + if (pub == null) { + pub = new UDDIPublicationImpl(); + } + serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class); + Init(); + try { + startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default + interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default + } catch (ConfigurationException ex) { + logger.fatal(ex); } - } + } - return false; - } - - private class PullTimerTask extends TimerTask { - - private Timer timer = null; - - public PullTimerTask() { - super(); - timer = new Timer(true); - timer.scheduleAtFixedRate(this, startBuffer, interval); + + private synchronized void Init() { + if (queue == null) { + queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>(); + } + timer = new PullTimerTask(); + } - - @Override - public void run() { - - //ok someone told me there's a change available - while (!queue.isEmpty()) { - NotifyChangeRecordsAvailable poll = queue.poll(); - if (poll != null) { - UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode()); - try { - //ok now get all the changes - List<ChangeRecord> records - = replicationClient.getChangeRecords(node, - null, null, poll.getChangesAvailable()); - //ok now we need to persist the change records - for (int i = 0; i < records.size(); i++) { - PersistChangeRecord(records.get(i)); + + private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) { + if (changesAlreadySeen != null) { + for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { + if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID()) + && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) { + return true; + } } - } catch (Exception ex) { - logger.equals(ex); - } } - } - } - - @Override - public boolean cancel() { - timer.cancel(); - return super.cancel(); + return false; } /** - * someone told me there's a change available, we retrieved it and are - * processing the changes locally - * - * @param rec + * handles when a remote node tells me that there's an update(s) + * available */ - private void PersistChangeRecord(ChangeRecord rec) { - if (rec == null) { - return; - } - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - /** - * In nodes that support pre-bundled replication responses, the - * recipient of the get_changeRecords message MAY return more change - * records than requested by the caller. In this scenario, the - * caller MUST also be prepared to deal with such redundant changes - * where a USN is less than the USN specified in the - * changesAlreadySeen highWaterMarkVector. - */ - try { - tx.begin(); - //the change record rec must also be persisted!! - em.persist(MappingApiToModel.mapChangeRecord(rec)); - //<editor-fold defaultstate="collapsed" desc="delete a record"> - - if (rec.getChangeRecordDelete() != null) { - if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) { - //delete a binding template - pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em); - } - if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) { - //delete a business - pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em); - } - if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) { - //delete a service - pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em); - } - if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) { - //delete a tmodel - /** - * The changeRecordDelete for a tModel does not - * correspond to any API described in this specification - * and should only appear in the replication stream as - * the result of an administrative function to - * permanently remove a tModel. - */ - pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em); - } + private class PullTimerTask extends TimerTask { + + private Timer timer = null; + + public PullTimerTask() { + super(); + timer = new Timer(true); + timer.scheduleAtFixedRate(this, startBuffer, interval); } - if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) { - //delete a pa template - pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em); + + @Override + public void run() { + + logger.info("Replication change puller thread started. Queue size: " + queue.size()); + //ok someone told me there's a change available + while (!queue.isEmpty()) { + NotifyChangeRecordsAvailable poll = queue.poll(); + if (poll != null) { + UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode()); + if (replicationClient == null) { + logger.fatal("unable to obtain a replication client to node " + poll.getNotifyingNode()); + } + try { + //get the high water marks for this node + //ok now get all the changes + logger.info("fetching updates..."); + + List<ChangeRecord> records + = replicationClient.getChangeRecords(node, + poll.getChangesAvailable(), BigInteger.valueOf(100), null); + //ok now we need to persist the change records + logger.info("Change records retrieved " + records.size()); + for (int i = 0; i < records.size(); i++) { + PersistChangeRecord(records.get(i)); + } + } catch (Exception ex) { + logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex); + } + } else { + logger.warn("weird, popped an object from the queue but it was null."); + } + } + } + + @Override + public boolean cancel() { + timer.cancel(); + return super.cancel(); } + /** + * someone told me there's a change available, we retrieved it + * and are processing the changes locally + * + * @param rec + */ + private void PersistChangeRecord(ChangeRecord rec) { + if (rec == null) { + return; + } + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + /** + * In nodes that support pre-bundled replication + * responses, the recipient of the get_changeRecords + * message MAY return more change records than requested + * by the caller. In this scenario, the caller MUST also + * be prepared to deal with such redundant changes where + * a USN is less than the USN specified in the + * changesAlreadySeen highWaterMarkVector. + */ + try { + tx.begin(); + //the change record rec must also be persisted!! + em.persist(MappingApiToModel.mapChangeRecord(rec)); + //<editor-fold defaultstate="collapsed" desc="delete a record"> + + if (rec.getChangeRecordDelete() != null) { + if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) { + //delete a binding template + pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em); + } + if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) { + //delete a business + pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em); + } + if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) { + //delete a service + pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em); + } + if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) { + //delete a tmodel + /** + * The changeRecordDelete for a + * tModel does not correspond to + * any API described in this + * specification and should only + * appear in the replication + * stream as the result of an + * administrative function to + * permanently remove a tModel. + */ + pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em); + } + } + if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) { + //delete a pa template + pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em); + } + //</editor-fold> - //<editor-fold defaultstate="collapsed" desc="New Data"> - if (rec.getChangeRecordNewData() != null) { - - if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) { - throw new Exception("Inbound replication data is missiong node id!"); - } - - //The operationalInfo element MUST contain the operational information associated with the indicated new data. - if (rec.getChangeRecordNewData().getOperationalInfo() == null) { - logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored"); - } else { - if (rec.getChangeRecordNewData().getBindingTemplate() != null) { + //<editor-fold defaultstate="collapsed" desc="New Data"> + if (rec.getChangeRecordNewData() != null) { + + if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) { + throw new Exception("Inbound replication data is missiong node id!"); + } + + //The operationalInfo element MUST contain the operational information associated with the indicated new data. + if (rec.getChangeRecordNewData().getOperationalInfo() == null) { + logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored"); + } else { + if (rec.getChangeRecordNewData().getBindingTemplate() != null) { //fetch the binding template if it exists already - //if it exists, - // confirm the owning node, it shouldn't be the local node id, if it is, throw - // the owning node should be the same as it was before - - BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey()); - if (model == null) { - logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet"); - } else { - ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo()); - - org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate(); - MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model); - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); - em.persist(model); - } - - } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) { - - BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey()); - if (model == null) { - model = new BusinessEntity(); - } else { - ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo()); - } - MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); - - em.persist(model); - - } - if (rec.getChangeRecordNewData().getBusinessService() != null) { - BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey()); - if (find == null) { - logger.error("Replication error, attempting to insert a service where the business doesn't exist yet"); - } else { - org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService(); - MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find); - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); - - em.persist(model); - } - - } else if (rec.getChangeRecordNewData().getTModel() != null) { - Tmodel model = new Tmodel(); - MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); - - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); - - em.persist(model); - } - - } - - } + //if it exists, + // confirm the owning node, it shouldn't be the local node id, if it is, throw + // the owning node should be the same as it was before + + BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey()); + if (model == null) { + logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet"); + } else { + ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo()); + + org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate(); + MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model); + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + em.persist(model); + } + + } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) { + + BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey()); + if (model == null) { + model = new BusinessEntity(); + } else { + ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo()); + } + MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + + em.persist(model); + + } + if (rec.getChangeRecordNewData().getBusinessService() != null) { + BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey()); + if (find == null) { + logger.error("Replication error, attempting to insert a service where the business doesn't exist yet"); + } else { + org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService(); + MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find); + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + + em.persist(model); + } + + } else if (rec.getChangeRecordNewData().getTModel() != null) { + Tmodel model = new Tmodel(); + MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); + + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + + em.persist(model); + } + + } + + } //</editor-fold> // changeRecordNull no action needed - // changeRecordHide tmodel only - //<editor-fold defaultstate="collapsed" desc="hide tmodel"> - if (rec.getChangeRecordHide() != null) { - /* - A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call. + // changeRecordHide tmodel only + //<editor-fold defaultstate="collapsed" desc="hide tmodel"> + if (rec.getChangeRecordHide() != null) { + /* + A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call. - The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure. - */ - String key = rec.getChangeRecordHide().getTModelKey(); - org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key); - if (existing == null) { - logger.error("Unexpected delete/hide tmodel message received for non existing key " + key); - } else { - existing.setDeleted(true); - existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); - existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); - em.persist(existing); - } - } + The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure. + */ + String key = rec.getChangeRecordHide().getTModelKey(); + org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key); + if (existing == null) { + logger.error("Unexpected delete/hide tmodel message received for non existing key " + key); + } else { + existing.setDeleted(true); + existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); + existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); + em.persist(existing); + } + } //</editor-fold> - //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion"> - if (rec.getChangeRecordPublisherAssertion() != null) { + //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion"> + if (rec.getChangeRecordPublisherAssertion() != null) { //TODO implement - } + } //</editor-fold> - tx.commit(); - - } catch (Exception drfm) { - logger.warn(drfm); - } finally { - if (tx.isActive()) { - tx.rollback(); + tx.commit(); + + } catch (Exception drfm) { + logger.warn(drfm); + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } } - em.close(); - } - } - - } - - private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception { - if (nodeId == null || operationalInfo == null) { - throw new Exception("either the local node ID is null or the inbound replication data's node id is null"); - } - if (!nodeId.equals(operationalInfo.getNodeID())) { - throw new Exception("node id mismatch!"); + } - } - - private synchronized UDDIReplicationPortType getReplicationClient(String node) { - if (cache.containsKey(node)) { - return cache.get(node); + + private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception { + if (nodeId == null || operationalInfo == null) { + throw new Exception("either the local node ID is null or the inbound replication data's node id is null"); + } + if (!nodeId.equals(operationalInfo.getNodeID())) { + throw new Exception("node id mismatch!"); + } } - UDDIService svc = new UDDIService(); - UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort(); - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - try { - Node find = em.find(org.apache.juddi.model.Node.class, node); - ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, find.getReplicationUrl()); - cache.put(node, replicationClient); - return replicationClient; - } catch (Exception ex) { - logger.fatal("Node not found!" + node, ex); - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); + + private synchronized UDDIReplicationPortType getReplicationClient(String node) { + if (cache.containsKey(node)) { + return cache.get(node); + } + UDDIService svc = new UDDIService(); + UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort(); + + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + StringBuilder sql = new StringBuilder(); + sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc"); + sql.toString(); + Query qry = em.createQuery(sql.toString()); + qry.setMaxResults(1); + + org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult(); + for (Operator o : resultList.getOperator()) { + if (o.getOperatorNodeID().equalsIgnoreCase(node)) { + ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL()); + cache.put(node, replicationClient); + return replicationClient; + } + } + tx.rollback(); + + } catch (Exception ex) { + logger.fatal("Node not found!" + node, ex); + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } + //em.close(); + return null; + } - em.close(); - return null; - - } - private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>(); - - /** - * @since 3.3 - * @param body - * @return - * @throws DispositionReportFaultMessage - */ - public String doPing(DoPing body) throws DispositionReportFaultMessage { - long startTime = System.currentTimeMillis(); - long procTime = System.currentTimeMillis() - startTime; - serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime); - - return node; - - } - - @Override - public List<ChangeRecord> getChangeRecords(String requestingNode, - HighWaterMarkVectorType changesAlreadySeen, - BigInteger responseLimitCount, - HighWaterMarkVectorType responseLimitVector) - throws DispositionReportFaultMessage { - long startTime = System.currentTimeMillis(); - - new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx); - - //TODO should we validate that "requestingNode" is in the replication config? - List<ChangeRecord> ret = new ArrayList<ChangeRecord>(); - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); + private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>(); /** - * More specifically, the recipient determines the particular change - * records that are returned by comparing the originating USNs in the - * callerâs high water mark vector with the originating USNs of each of - * the changes the recipient has seen from others or generated by - * itself. The recipient SHOULD only return change records that have - * originating USNs that are greater than those listed in the - * changesAlreadySeen highWaterMarkVector and less than the limit - * required by either the responseLimitCount or the responseLimitVector. - * - * - * Part of the message is a high water mark vector that contains for - * each node of the registry the originating USN of the most recent - * change record that has been successfully processed by the invocating - * node + * @since 3.3 + * @param body + * @return + * @throws DispositionReportFaultMessage */ - int maxrecords = 100; - if (responseLimitCount != null) { - maxrecords = responseLimitCount.intValue(); + public String doPing(DoPing body) throws DispositionReportFaultMessage { + long startTime = System.currentTimeMillis(); + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime); + + return node; + } - try { - tx.begin(); - Long firstrecord = 1L; - Long lastrecord = null; - - if (changesAlreadySeen != null) { - //this is basically a lower limit (i.e. the newest record that was processed by the requestor - //therefore we want the oldest record stored locally to return to the requestor for processing - for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { - if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { - firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1; - } + + @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3") + @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords") + @ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords") + + @Override + public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode, + @WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen, + @WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount, + @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector) + throws DispositionReportFaultMessage { + long startTime = System.currentTimeMillis(); + + new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx); + + //TODO should we validate that "requestingNode" is in the replication config? + List<ChangeRecord> ret = new ArrayList<ChangeRecord>(); + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + + /** + * More specifically, the recipient determines the particular + * change records that are returned by comparing the originating + * USNs in the callerâs high water mark vector with the + * originating USNs of each of the changes the recipient has + * seen from others or generated by itself. The recipient SHOULD + * only return change records that have originating USNs that + * are greater than those listed in the changesAlreadySeen + * highWaterMarkVector and less than the limit required by + * either the responseLimitCount or the responseLimitVector. + * + * + * Part of the message is a high water mark vector that contains + * for each node of the registry the originating USN of the most + * recent change record that has been successfully processed by + * the invocating node + */ + int maxrecords = 100; + if (responseLimitCount != null) { + maxrecords = responseLimitCount.intValue(); } - } - if (responseLimitVector != null) { + try { + tx.begin(); + Long firstrecord = 1L; + Long lastrecord = null; + + if (changesAlreadySeen != null) { + //this is basically a lower limit (i.e. the newest record that was processed by the requestor + //therefore we want the oldest record stored locally to return to the requestor for processing + for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { + if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) { + firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1; + } + } + } + if (responseLimitVector != null) { //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned. - //upper limit basically - for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) { - if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { - lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN(); - } - } - } - - Query createQuery = null; - if (lastrecord != null) { - createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node and e.id < :lastrecord) OR (e.originatingUSN > :inbound and e.nodeID != :node and e.originatingUSN < :lastrecord) order by e.id ASC"); - createQuery.setParameter("lastrecord", lastrecord); - } else { - createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node) OR (e.originatingUSN > :inbound and e.nodeID != :node) order by e.id ASC"); - } - createQuery.setMaxResults(maxrecords); - createQuery.setParameter("inbound", firstrecord); - createQuery.setParameter("node", node); - - List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) (org.apache.juddi.model.ChangeRecord) createQuery.getResultList(); - for (int i = 0; i < records.size(); i++) { - ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i)); - if (!Excluded(changesAlreadySeen, r)) { - ret.add(r); + //upper limit basically + for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) { + if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { + lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN(); + } + } + } + + Query createQuery = null; + if (lastrecord != null) { + createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node and e.id < :lastrecord) OR " + + "(e.originatingUSN > :inbound and e.nodeID <> :node and e.originatingUSN < :lastrecord)) order by e.id ASC"); + createQuery.setParameter("lastrecord", lastrecord); + } else { + createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node) OR " + + "(e.originatingUSN > :inbound and e.nodeID <> :node)) order by e.id ASC"); + } + createQuery.setMaxResults(maxrecords); + createQuery.setParameter("inbound", firstrecord); + createQuery.setParameter("node", node); + + List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); + for (int i = 0; i < records.size(); i++) { + ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i)); + if (!Excluded(changesAlreadySeen, r)) { + ret.add(r); + } + + } + + tx.rollback(); + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS, + QueryStatus.SUCCESS, procTime); + + } catch (Exception ex) { + logger.fatal("Error, this node is: " + node, ex); + throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage())); + + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); } - - } - - tx.rollback(); - long procTime = System.currentTimeMillis() - startTime; - serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS, - QueryStatus.SUCCESS, procTime); - - } catch (Exception ex) { - logger.fatal("Error, this node is: " + node, ex); - throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage())); - - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); + logger.info("Change records returned for " + requestingNode + ": " + ret.size()); + //JAXB.marshal(ret, System.out); + return ret; } - return ret; - } - - /** - * This UDDI API message provides a means to obtain a list of highWaterMark - * element containing the highest known USN for all nodes in the replication - * graph. If there is no graph, we just return the local bits - * - * @return - * @throws DispositionReportFaultMessage - */ - @Override - public List<ChangeRecordIDType> getHighWaterMarks() - throws DispositionReportFaultMessage { - long startTime = System.currentTimeMillis(); - - List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>(); - - //fetch from database the highest known watermark - ReplicationConfiguration FetchEdges = FetchEdges(); - - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - try { - tx.begin(); - if (FetchEdges != null) { - Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator(); - while (it.hasNext()) { - String nextNode = it.next(); - if (!nextNode.equals(node)) { - - Long id = 0L; - try { - id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult(); - } catch (Exception ex) { - logger.debug(ex); + + /** + * This UDDI API message provides a means to obtain a list of + * highWaterMark element containing the highest known USN for all nodes + * in the replication graph. If there is no graph, we just return the + * local bits + * + * @return + * @throws DispositionReportFaultMessage + */ + @Override + public List<ChangeRecordIDType> getHighWaterMarks() + throws DispositionReportFaultMessage { + long startTime = System.currentTimeMillis(); + + List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>(); + + //fetch from database the highest known watermark + ReplicationConfiguration FetchEdges = FetchEdges(); + + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + tx.begin(); + if (FetchEdges != null) { + Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator(); + while (it.hasNext()) { + String nextNode = it.next(); + if (!nextNode.equals(node)) { + + Long id = 0L; + try { + id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult(); + } catch (Exception ex) { + logger.debug(ex); + } + if (id == null) { + id = 0L; + //per the spec + } + ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id); + + ret.add(x); + + } + } } + //dont forget this node + Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult(); if (id == null) { - id = 0L; - //per the spec + id = 0L; } - ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id); - + ChangeRecordIDType x = new ChangeRecordIDType(); + x.setNodeID(node); + x.setOriginatingUSN(id); ret.add(x); - - } + + tx.rollback(); + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime); + + } catch (Exception drfm) { + throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage())); + + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); } - } - //dont forget this node - Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult(); - if (id == null) { - id = 0L; - } - ChangeRecordIDType x = new ChangeRecordIDType(); - x.setNodeID(node); - x.setOriginatingUSN(id); - ret.add(x); - - tx.rollback(); - long procTime = System.currentTimeMillis() - startTime; - serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime); - - } catch (Exception drfm) { - throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage())); - - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); + + return ret; } - - - - return ret; - } - - /** - * this means that another node has a change and we need to pick up the - * change and apply it to our local database. - * - * @param body - * @throws DispositionReportFaultMessage - */ - @Override - public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body) - throws DispositionReportFaultMessage { - long startTime = System.currentTimeMillis(); - long procTime = System.currentTimeMillis() - startTime; - serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE, - QueryStatus.SUCCESS, procTime); + + /** + * this means that another node has a change and we need to pick up the + * change and apply it to our local database. + * + * @param body + * @throws DispositionReportFaultMessage + */ + @Override + public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body) + throws DispositionReportFaultMessage { + long startTime = System.currentTimeMillis(); + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE, + QueryStatus.SUCCESS, procTime); //some other node just told us there's new records available, call - //getChangeRecords from the remote node asynch - - new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx); - - queue.add(body); - - //ValidateReplication.unsupportedAPICall(); - } - private static Queue<NotifyChangeRecordsAvailable> queue = null; - - /** - * transfers custody of an entity from node1/user1 to node2/user2 - * - * @param body - * @throws DispositionReportFaultMessage - */ - @Override - public void transferCustody(TransferCustody body) - throws DispositionReportFaultMessage { - long startTime = System.currentTimeMillis(); + //getChangeRecords from the remote node asynch - //*this node is transfering data to another node - //body.getTransferOperationalInfo(). - ValidateReplication.unsupportedAPICall(); - - EntityManager em = PersistenceManager.getEntityManager(); - //EntityTransaction tx = em.getTransaction(); + new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx); + + logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing..."); + queue.add(body); + + //ValidateReplication.unsupportedAPICall(); + } + private static Queue<NotifyChangeRecordsAvailable> queue = null; /** - * The custodial node must verify that it has granted permission to - * transfer the entities identified and that this permission is still - * valid. This operation is comprised of two steps: - * - * 1. Verification that the transferToken was issued by it, that it has - * not expired, that it represents the authority to transfer no more and - * no less than those entities identified by the businessKey and - * tModelKey elements and that all these entities are still valid and - * not yet transferred. The transferToken is invalidated if any of these - * conditions are not met. - * - * 2. If the conditions above are met, the custodial node will prevent - * any further changes to the entities identified by the businessKey and - * tModelKey elements identified. The entity will remain in this state - * until the replication stream indicates it has been successfully - * processed via the replication stream. Upon successful verification of - * the custody transfer request by the custodial node, an empty message - * is returned by it indicating the success of the request and - * acknowledging the custody transfer. Following the issue of the empty - * message, the custodial node will submit into the replication stream a - * changeRecordNewData providing in the operationalInfo, the nodeID - * accepting custody of the datum and the authorizedName of the - * publisher accepting ownership. The acknowledgmentRequested attribute - * of this change record MUST be set to "true". + * transfers custody of an entity from node1/user1 to node2/user2 * - * TODO enqueue Replication message - * - * Finally, the custodial node invalidates the transferToken in order to - * prevent additional calls of the transfer_entities API. + * @param body + * @throws DispositionReportFaultMessage */ - DiscardTransferToken dtt = new DiscardTransferToken(); - dtt.setKeyBag(body.getKeyBag()); - dtt.setTransferToken(body.getTransferToken()); - new UDDICustodyTransferImpl().discardTransferToken(dtt); - } - + @Override + public void transferCustody(TransferCustody body) + throws DispositionReportFaultMessage { + long startTime = System.currentTimeMillis(); + + //*this node is transfering data to another node + //body.getTransferOperationalInfo(). + ValidateReplication.unsupportedAPICall(); + + EntityManager em = PersistenceManager.getEntityManager(); + //EntityTransaction tx = em.getTransaction(); + + /** + * The custodial node must verify that it has granted permission + * to transfer the entities identified and that this permission + * is still valid. This operation is comprised of two steps: + * + * 1. Verification that the transferToken was issued by it, that + * it has not expired, that it represents the authority to + * transfer no more and no less than those entities identified + * by the businessKey and tModelKey elements and that all these + * entities are still valid and not yet transferred. The + * transferToken is invalidated if any of these conditions are + * not met. + * + * 2. If the conditions above are met, the custodial node will + * prevent any further changes to the entities identified by the + * businessKey and tModelKey elements identified. The entity + * will remain in this state until the replication stream + * indicates it has been successfully processed via the + * replication stream. Upon successful verification of the + * custody transfer request by the custodial node, an empty + * message is returned by it indicating the success of the + * request and acknowledging the custody transfer. Following the + * issue of the empty message, the custodial node will submit + * into the replication stream a changeRecordNewData providing + * in the operationalInfo, the nodeID accepting custody of the + * datum and the authorizedName of the publisher accepting + * ownership. The acknowledgmentRequested attribute of this + * change record MUST be set to "true". + * + * TODO enqueue Replication message + * + * Finally, the custodial node invalidates the transferToken in + * order to prevent additional calls of the transfer_entities + * API. + */ + DiscardTransferToken dtt = new DiscardTransferToken(); + dtt.setKeyBag(body.getKeyBag()); + dtt.setTransferToken(body.getTransferToken()); + new UDDICustodyTransferImpl().discardTransferToken(dtt); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
