JUDDI-241 basic functionality of replication is working. although there are some strange duplication of Name elements on replication business entities.
Project: http://git-wip-us.apache.org/repos/asf/juddi/repo Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/939ae47f Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/939ae47f Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/939ae47f Branch: refs/heads/master Commit: 939ae47f6eb4c9a14f93ca65f5948d386f84c0ee Parents: 874d02b Author: Alex <[email protected]> Authored: Sat Dec 6 17:24:47 2014 -0500 Committer: Alex <[email protected]> Committed: Sat Dec 6 17:24:47 2014 -0500 ---------------------------------------------------------------------- .../juddi/api/impl/UDDIReplicationImpl.java | 126 ++-- .../apache/juddi/mapping/MappingApiToModel.java | 67 ++ .../juddi/replication/ReplicationNotifier.java | 4 +- .../subscription/SubscriptionNotifier.java | 745 ++++++++++--------- .../juddi/api/impl/API_160_ReplicationTest.java | 8 +- .../apache/juddi/api/runtime/CLIServerTest.java | 3 +- .../apache/juddi/api/runtime/replicantImpl.java | 14 +- .../apache/juddi/samples/UddiReplication.java | 11 +- uddi-ws/pom.xml | 6 + .../org/uddi/repl_v3/ChangeRecordDelete.java | 8 +- .../v3_service/UDDIReplicationPortType.java | 10 +- .../juddi/api_v3/GetPublisherDetailTest.java | 7 + 12 files changed, 583 insertions(+), 426 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java index 3971e6b..30cfdec 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java @@ -17,6 +17,7 @@ package org.apache.juddi.api.impl; import java.math.BigInteger; +import java.rmi.RemoteException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -26,21 +27,16 @@ import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.jws.WebMethod; import javax.jws.WebParam; import javax.jws.WebResult; import javax.jws.WebService; +import javax.jws.soap.SOAPBinding; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import javax.persistence.Query; import javax.xml.bind.JAXB; import javax.xml.bind.annotation.XmlSeeAlso; -import javax.xml.datatype.DatatypeConfigurationException; import javax.xml.ws.BindingProvider; -import javax.xml.ws.RequestWrapper; -import javax.xml.ws.ResponseWrapper; import org.apache.commons.configuration.ConfigurationException; import static org.apache.juddi.api.impl.AuthenticatedService.logger; import org.apache.juddi.api.util.QueryStatus; @@ -52,7 +48,6 @@ import org.apache.juddi.mapping.MappingApiToModel; import org.apache.juddi.mapping.MappingModelToApi; import org.apache.juddi.model.BusinessEntity; import org.apache.juddi.model.BusinessService; -import org.apache.juddi.model.Node; import org.apache.juddi.model.Operator; import org.apache.juddi.model.Tmodel; import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges; @@ -64,7 +59,9 @@ import org.uddi.api_v3.OperationalInfo; import org.uddi.custody_v3.DiscardTransferToken; import org.uddi.repl_v3.ChangeRecord; import org.uddi.repl_v3.ChangeRecordIDType; +import org.uddi.repl_v3.ChangeRecords; import org.uddi.repl_v3.DoPing; +import org.uddi.repl_v3.GetChangeRecords; import org.uddi.repl_v3.HighWaterMarkVectorType; import org.uddi.repl_v3.NotifyChangeRecordsAvailable; import org.uddi.repl_v3.ReplicationConfiguration; @@ -176,22 +173,32 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode()); if (replicationClient == null) { logger.fatal("unable to obtain a replication client to node " + poll.getNotifyingNode()); - } - try { + } else { + try { //get the high water marks for this node - //ok now get all the changes - logger.info("fetching updates..."); - - List<ChangeRecord> records - = replicationClient.getChangeRecords(node, - poll.getChangesAvailable(), BigInteger.valueOf(100), null); - //ok now we need to persist the change records - logger.info("Change records retrieved " + records.size()); - for (int i = 0; i < records.size(); i++) { - PersistChangeRecord(records.get(i)); + //ok now get all the changes + logger.info("fetching updates on, since "); + for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) { + logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN()); + } + //JAXB.marshal(poll, System.out); + GetChangeRecords body = new GetChangeRecords(); + body.setRequestingNode(node); + body.setResponseLimitCount(BigInteger.valueOf(100)); + //indexing is screwed up + body.setChangesAlreadySeen(poll.getChangesAvailable()); + List<ChangeRecord> records + = replicationClient.getChangeRecords(body).getChangeRecord(); + //ok now we need to persist the change records + logger.info("Change records retrieved " + records.size()); + for (int i = 0; i < records.size(); i++) { + logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN()); + PersistChangeRecord(records.get(i)); + } + } catch (Exception ex) { + logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex); } - } catch (Exception ex) { - logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex); } } else { logger.warn("weird, popped an object from the queue but it was null."); @@ -226,10 +233,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep * a USN is less than the USN specified in the * changesAlreadySeen highWaterMarkVector. */ + logger.info("Remote change request"); + JAXB.marshal(rec, System.out); try { tx.begin(); //the change record rec must also be persisted!! - em.persist(MappingApiToModel.mapChangeRecord(rec)); + org.apache.juddi.model.ChangeRecord mapChangeRecord = MappingApiToModel.mapChangeRecord(rec); + mapChangeRecord.setId(null); + em.persist(mapChangeRecord); //<editor-fold defaultstate="collapsed" desc="delete a record"> if (rec.getChangeRecordDelete() != null) { @@ -291,7 +302,8 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate(); MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model); - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + MappingApiToModel.mapOperationalInfo(modelT, rec.getChangeRecordNewData().getOperationalInfo()); + // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); em.persist(model); } @@ -304,8 +316,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo()); } MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + // MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); em.persist(model); } @@ -317,6 +330,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService(); MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find); MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); em.persist(model); } @@ -366,7 +380,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep tx.commit(); } catch (Exception drfm) { - logger.warn(drfm); + logger.warn("Error persisting change record!", drfm); } finally { if (tx.isActive()) { tx.rollback(); @@ -441,17 +455,28 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } - @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3") - @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords") - @ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords") + @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE) + @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body") + // @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords") + public org.uddi.repl_v3.ChangeRecords getChangeRecords( + @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") org.uddi.repl_v3.GetChangeRecords body + ) throws DispositionReportFaultMessage, RemoteException {/* + @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3") + @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords") + @ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords") + + @Override + public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode, + @WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen, + @WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount, + @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector) + throws DispositionReportFaultMessage {*/ - @Override - public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode, - @WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen, - @WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount, - @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector) - throws DispositionReportFaultMessage { long startTime = System.currentTimeMillis(); + String requestingNode = body.getRequestingNode(); + HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen(); + BigInteger responseLimitCount = body.getResponseLimitCount(); + HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector(); new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx); @@ -483,7 +508,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } try { tx.begin(); - Long firstrecord = 1L; + Long firstrecord = 0L; Long lastrecord = null; if (changesAlreadySeen != null) { @@ -491,7 +516,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //therefore we want the oldest record stored locally to return to the requestor for processing for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) { - firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1; + firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN(); } } } @@ -505,25 +530,31 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } + logger.info("Query db for replication changes, lower index is " + (firstrecord - 1) + " last index " + lastrecord + " record limit " + maxrecords); Query createQuery = null; if (lastrecord != null) { - createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node and e.id < :lastrecord) OR " - + "(e.originatingUSN > :inbound and e.nodeID <> :node and e.originatingUSN < :lastrecord)) order by e.id ASC"); + createQuery = em.createQuery("select e from ChangeRecord e where " + + "((e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) OR " + + "(e.originatingUSN > :inbound AND e.nodeID <> :node AND e.originatingUSN < :lastrecord)) " + + "order by e.id ASC"); createQuery.setParameter("lastrecord", lastrecord); } else { - createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node) OR " - + "(e.originatingUSN > :inbound and e.nodeID <> :node)) order by e.id ASC"); + createQuery = em.createQuery("select e from ChangeRecord e where " + + "((e.id > :inbound AND e.nodeID = :node) OR " + + "(e.originatingUSN > :inbound AND e.nodeID <> :node)) " + + "order by e.id ASC"); } createQuery.setMaxResults(maxrecords); - createQuery.setParameter("inbound", firstrecord); + createQuery.setParameter("inbound", firstrecord - 1); createQuery.setParameter("node", node); List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); + logger.info(records.size() + " CR records returned from query"); for (int i = 0; i < records.size(); i++) { ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i)); - if (!Excluded(changesAlreadySeen, r)) { - ret.add(r); - } + //if (!Excluded(changesAlreadySeen, r)) { + ret.add(r); + //} } @@ -544,7 +575,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } logger.info("Change records returned for " + requestingNode + ": " + ret.size()); //JAXB.marshal(ret, System.out); - return ret; + ChangeRecords x = new ChangeRecords(); + x.getChangeRecord().addAll(ret); + return x; } /** @@ -639,9 +672,8 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx); - logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing..."); queue.add(body); - + logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size()); //ValidateReplication.unsupportedAPICall(); } private static Queue<NotifyChangeRecordsAvailable> queue = null; http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java b/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java index 83a1492..63af296 100644 --- a/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java +++ b/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.juddi.jaxb.JAXBMarshaller; import org.apache.juddi.model.Address; import org.apache.juddi.model.BindingTemplate; +import org.apache.juddi.model.BusinessEntity; import org.apache.juddi.model.BusinessService; import org.apache.juddi.model.CanonicalizationMethod; import org.apache.juddi.model.Contact; @@ -1381,6 +1382,27 @@ public class MappingApiToModel { public static org.apache.juddi.model.ChangeRecord mapChangeRecord(ChangeRecord rec) throws UnsupportedEncodingException { org.apache.juddi.model.ChangeRecord r = new org.apache.juddi.model.ChangeRecord(); r.setId(rec.getChangeID().getOriginatingUSN()); + r.setOriginatingUSN(rec.getChangeID().getOriginatingUSN()); + if (rec.getChangeRecordNewData()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordNewData); + else if (rec.getChangeRecordAcknowledgement()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordAcknowledgement); + else if (rec.getChangeRecordConditionFailed()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordConditionFailed); + else if (rec.getChangeRecordCorrection()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordCorrection); + else if (rec.getChangeRecordDelete()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordDelete); + else if (rec.getChangeRecordDeleteAssertion()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordDeleteAssertion); + else if (rec.getChangeRecordHide()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordHide); + else if (rec.getChangeRecordNewDataConditional()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordNewDataConditional); + else if (rec.getChangeRecordNull()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordNull); + else if (rec.getChangeRecordPublisherAssertion()!=null) + r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordPublisherAssertion); r.setNodeID(rec.getChangeID().getNodeID()); StringWriter sw = new StringWriter(); JAXB.marshal(rec, sw); @@ -1406,6 +1428,30 @@ public class MappingApiToModel { model.setNodeId(operationalInfo.getNodeID()); } + + public static void mapOperationalInfoIncludingChildren(BusinessEntity model, OperationalInfo operationalInfo) { + if (operationalInfo == null || model == null) { + return; + } + if (operationalInfo.getCreated() != null) { + model.setCreated(operationalInfo.getCreated().toGregorianCalendar().getTime()); + } + model.setAuthorizedName(operationalInfo.getAuthorizedName()); + if (operationalInfo.getModified() != null) { + model.setModified(operationalInfo.getModified().toGregorianCalendar().getTime()); + } + if (operationalInfo.getModifiedIncludingChildren() != null) { + model.setModifiedIncludingChildren(operationalInfo.getModifiedIncludingChildren().toGregorianCalendar().getTime()); + } + model.setNodeId(operationalInfo.getNodeID()); + for (int i=0; i < model.getBusinessServices().size(); i++) + { + mapOperationalInfo( model.getBusinessServices().get(i), operationalInfo); + for (int k=0; k < model.getBusinessServices().get(i).getBindingTemplates().size(); k++) + mapOperationalInfo( model.getBusinessServices().get(i).getBindingTemplates().get(k), operationalInfo); + } + + } public static void mapSaveBindingToChangeRecord(SaveBinding recordIn, List<org.apache.juddi.model.ChangeRecord> recordsOut) { List<org.apache.juddi.model.ChangeRecord> r = new ArrayList<org.apache.juddi.model.ChangeRecord>(); @@ -1582,5 +1628,26 @@ public class MappingApiToModel { } } + public static void mapOperationalInfoIncludingChildren(BusinessService model, OperationalInfo operationalInfo) { + if (operationalInfo == null || model == null) { + return; + } + if (operationalInfo.getCreated() != null) { + model.setCreated(operationalInfo.getCreated().toGregorianCalendar().getTime()); + } + model.setAuthorizedName(operationalInfo.getAuthorizedName()); + if (operationalInfo.getModified() != null) { + model.setModified(operationalInfo.getModified().toGregorianCalendar().getTime()); + } + if (operationalInfo.getModifiedIncludingChildren() != null) { + model.setModifiedIncludingChildren(operationalInfo.getModifiedIncludingChildren().toGregorianCalendar().getTime()); + } + model.setNodeId(operationalInfo.getNodeID()); + + for (int k=0; k < model.getBindingTemplates().size(); k++) + mapOperationalInfo( model.getBindingTemplates().get(k), operationalInfo); + + } + } http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java b/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java index 132c530..d71f2bd 100644 --- a/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java +++ b/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java @@ -59,8 +59,8 @@ public class ReplicationNotifier extends TimerTask { private static Log log = LogFactory.getLog(ReplicationNotifier.class); private Timer timer = null; private long startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default - private long interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default - private long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 1000l); //1000 milliseconds + private long interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default + private long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 10000l); //1000 milliseconds private static String node = null; /** http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/main/java/org/apache/juddi/subscription/SubscriptionNotifier.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/subscription/SubscriptionNotifier.java b/juddi-core/src/main/java/org/apache/juddi/subscription/SubscriptionNotifier.java index 933f476..3dc0771 100644 --- a/juddi-core/src/main/java/org/apache/juddi/subscription/SubscriptionNotifier.java +++ b/juddi-core/src/main/java/org/apache/juddi/subscription/SubscriptionNotifier.java @@ -55,400 +55,417 @@ import org.uddi.subr_v3.NotifySubscriptionListener; import org.uddi.v3_service.DispositionReportFaultMessage; /** - * + * * @author <a href="mailto:[email protected]">Kurt T Stam</a> * */ public class SubscriptionNotifier extends TimerTask { - private Log log = LogFactory.getLog(this.getClass()); - private Timer timer = null; - private long startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default - private long interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default - private long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 1000l); //1000 milliseconds - private int maxTries = AppConfig.getConfiguration().getInt(Property.JUDDI_NOTIFICATION_MAX_TRIES, 3); - private long badListResetInterval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_LIST_RESET_INTERVAL, 1000l * 3600); //one hour + private Log log = LogFactory.getLog(this.getClass()); + private Timer timer = null; + private long startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default + private long interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default + private long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 1000l); //1000 milliseconds + private int maxTries = AppConfig.getConfiguration().getInt(Property.JUDDI_NOTIFICATION_MAX_TRIES, 3); + private long badListResetInterval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_LIST_RESET_INTERVAL, 1000l * 3600); //one hour /** * @since 3.2 */ - private boolean sendToken = AppConfig.getConfiguration().getBoolean(Property.JUDDI_NOTIFICATION_SENDAUTHTOKEN, false); - private UDDISubscriptionImpl subscriptionImpl = new UDDISubscriptionImpl(); - private Boolean alwaysNotify = false; - private Date desiredDate = null; - private int lastUpdateCounter; - private UDDIServiceCounter serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIPublicationImpl.class); - private String[] attributes = { - "save_business", "save_service", "save_binding", "save_tmodel", - "delete_business","delete_service","delete_binding","delete_tmodel", - "add_publisherassertions","set_publisherassertions","delete_publisherassertions" - }; - private static Map<String,Integer> badNotifications= new ConcurrentHashMap<String,Integer>(); - private static Date lastBadNotificationReset = new Date(); - + private boolean sendToken = AppConfig.getConfiguration().getBoolean(Property.JUDDI_NOTIFICATION_SENDAUTHTOKEN, false); + private UDDISubscriptionImpl subscriptionImpl = new UDDISubscriptionImpl(); + private Boolean alwaysNotify = false; + private Date desiredDate = null; + private int lastUpdateCounter; + private UDDIServiceCounter serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIPublicationImpl.class); + private String[] attributes = { + "save_business", "save_service", "save_binding", "save_tmodel", + "delete_business", "delete_service", "delete_binding", "delete_tmodel", + "add_publisherassertions", "set_publisherassertions", "delete_publisherassertions" + }; + private static Map<String, Integer> badNotifications = new ConcurrentHashMap<String, Integer>(); + private static Date lastBadNotificationReset = new Date(); + /** * default constructor - * @throws ConfigurationException + * + * @throws ConfigurationException + */ + public SubscriptionNotifier() throws ConfigurationException { + super(); + timer = new Timer(true); + timer.scheduleAtFixedRate(this, startBuffer, interval); + } + + @Override + public boolean cancel() { + timer.cancel(); + return super.cancel(); + } + + /** + * If the CRUD methods on the publication API where not called, this + * registry node does not contain changes. If the registry database is + * shared with other registry nodes and one of those registries pushed + * in a change, then that registry node will take care of sending out + * notifications. + * + * @return true/false */ - public SubscriptionNotifier() throws ConfigurationException { - super(); - timer = new Timer(true); - timer.scheduleAtFixedRate(this, startBuffer, interval); - } - - @Override - public boolean cancel() { - timer.cancel(); - return super.cancel(); - } - - /** - * If the CRUD methods on the publication API where not called, this registry node does not contain changes. If - * the registry database is shared with other registry nodes and one of those registries pushed in a change, then - * that registry node will take care of sending out notifications. - * @return true/false - */ - protected boolean registryMayContainUpdates() { - boolean isUpdated = false; - int updateCounter = 0; + protected boolean registryMayContainUpdates() { + boolean isUpdated = false; + int updateCounter = 0; //if the desiredDate is set it means that we've declined sending out a notification before - //because the a client did not want a notification yet. However if this desired - //notification time has come we should try sending out the notification now. - if (desiredDate!=null && new Date().getTime() > desiredDate.getTime()) { - return true; - } - try { - for (String attribute : attributes) { - String counter = serviceCounter.getAttribute(attribute + " successful queries").toString(); - updateCounter += Integer.valueOf(counter); - } + //because the a client did not want a notification yet. However if this desired + //notification time has come we should try sending out the notification now. + if (desiredDate != null && new Date().getTime() > desiredDate.getTime()) { + return true; + } + try { + for (String attribute : attributes) { + String counter = serviceCounter.getAttribute(attribute + " successful queries").toString(); + updateCounter += Integer.valueOf(counter); + } // if the counts are not the same something has changed, - // this accounts for the case where the counters where reset. - if (updateCounter != lastUpdateCounter) { - lastUpdateCounter = updateCounter; - isUpdated = true; - } - } catch (Exception e) { - log.error(e.getMessage(),e); - } - return isUpdated; - } + // this accounts for the case where the counters where reset. + if (updateCounter != lastUpdateCounter) { + lastUpdateCounter = updateCounter; + isUpdated = true; + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return isUpdated; + } + + public synchronized void run() { + if (badListResetInterval > 0 && new Date().getTime() > lastBadNotificationReset.getTime() + badListResetInterval) { + badNotifications = new ConcurrentHashMap<String, Integer>(); + lastBadNotificationReset = new Date(); + log.debug("badNotificationList was reset"); + } + if ((firedOnTime(scheduledExecutionTime()) || alwaysNotify) && registryMayContainUpdates()) { + long startTime = System.currentTimeMillis(); + desiredDate = null; + log.info("Start Notification background task; checking if subscription notifications need to be send out.."); - public synchronized void run() - { - if (badListResetInterval > 0 && new Date().getTime() > lastBadNotificationReset.getTime() + badListResetInterval) { - badNotifications = new ConcurrentHashMap<String,Integer>(); - lastBadNotificationReset = new Date(); - log.debug("badNotificationList was reset"); - } - if ((firedOnTime(scheduledExecutionTime()) || alwaysNotify) && registryMayContainUpdates()) { - long startTime = System.currentTimeMillis(); - desiredDate = null; - log.info("Start Notification background task; checking if subscription notifications need to be send out.."); - - Collection<Subscription> subscriptions = getAllAsyncSubscriptions(); - for (Subscription subscription : subscriptions) { - - - if (subscription.getExpiresAfter()==null || subscription.getExpiresAfter().getTime() > startTime || - !isTemporarilyDisabled(subscription.getSubscriptionKey())) { - try { + Collection<Subscription> subscriptions = getAllAsyncSubscriptions(); + for (Subscription subscription : subscriptions) { + + if (subscription.getExpiresAfter() == null || subscription.getExpiresAfter().getTime() > startTime + || !isTemporarilyDisabled(subscription.getSubscriptionKey())) { + try { //build a query with a coverage period from the lastNotified time to - //now (the scheduled Execution time) - Date notificationDate = new Date(scheduledExecutionTime()); - GetSubscriptionResults getSubscriptionResults = - buildGetSubscriptionResults(subscription, notificationDate); - if (getSubscriptionResults!=null) { - getSubscriptionResults.setSubscriptionKey(subscription.getSubscriptionKey()); - UddiEntityPublisher publisher = new UddiEntityPublisher(); - publisher.setAuthorizedName(subscription.getAuthorizedName()); - SubscriptionResultsList resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); + //now (the scheduled Execution time) + Date notificationDate = new Date(scheduledExecutionTime()); + GetSubscriptionResults getSubscriptionResults + = buildGetSubscriptionResults(subscription, notificationDate); + if (getSubscriptionResults != null) { + getSubscriptionResults.setSubscriptionKey(subscription.getSubscriptionKey()); + UddiEntityPublisher publisher = new UddiEntityPublisher(); + publisher.setAuthorizedName(subscription.getAuthorizedName()); + SubscriptionResultsList resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); String token = resultList.getChunkToken(); - if (resultListContainsChanges(resultList)) { - log.info("We have a change and need to notify " + subscription.getSubscriptionKey()); + if (resultListContainsChanges(resultList)) { + log.info("We have a change and need to notify " + subscription.getSubscriptionKey()); resultList.setChunkToken(null); - //Note that the chunkToken is not returned with this structure for this API. - notify(getSubscriptionResults,resultList, notificationDate); - } else { - log.info("No changes where recorded, no need to notify."); - } - while (!token.equalsIgnoreCase("0")) - { - resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); - if (resultListContainsChanges(resultList)) { - log.info("We have a change and need to notify " + subscription.getSubscriptionKey()); - resultList.setChunkToken(null); - //Note that the chunkToken is not returned with this structure for this API. - notify(getSubscriptionResults,resultList, notificationDate); - } else { - log.info("No changes where recorded, no need to notify."); - } + //Note that the chunkToken is not returned with this structure for this API. + notify(getSubscriptionResults, resultList, notificationDate); + } else { + log.info("No changes where recorded, no need to notify."); + } + while (!token.equalsIgnoreCase("0")) { + resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); + if (resultListContainsChanges(resultList)) { + log.info("We have a change and need to notify " + subscription.getSubscriptionKey()); + resultList.setChunkToken(null); + //Note that the chunkToken is not returned with this structure for this API. + notify(getSubscriptionResults, resultList, notificationDate); + } else { + log.info("No changes where recorded, no need to notify."); + } } - - } - } catch (Exception e) { - log.error("Could not obtain subscriptionResult for subscriptionKey " - + subscription.getSubscriptionKey() + ". " + e.getMessage(),e); - } - } else { - // the subscription expired, we should delete it - log.info("Subcription with key " + subscription.getSubscriptionKey() - + " expired " + subscription.getExpiresAfter()); - deleteSubscription(subscription); - } - } - long endTime = System.currentTimeMillis(); - - if ((endTime-startTime) > interval) { - log.info("Notification background task duration exceeds the JUDDI_NOTIFICATION_INTERVAL" + - " of " + interval + ". Notification background task took " - + (endTime - startTime) + " milliseconds."); - } else { - log.info("Notification background task took " + (endTime - startTime) + " milliseconds."); - } - } else { - log.warn("Skipping current notification cycle because lagtime is too great."); - } - } - /** - * Checks to see that the event are fired on time. If they are late this may indicate that the server - * is under load. The acceptableLagTime is configurable using the "juddi.notification.acceptable.lagtime" - * property and is defaulted to 500ms. A negative value means that you do not care about the lag time - * and you simply always want to go do the notification work. - * - * @param scheduleExecutionTime - * @return true if the server is within the acceptable latency lag. - */ - private boolean firedOnTime(long scheduleExecutionTime) { - long lagTime = System.currentTimeMillis() - scheduleExecutionTime; - if (lagTime <= acceptableLagTime || acceptableLagTime < 0) { - return true; - } else { - log.debug("NotificationTimer is lagging " + lagTime + " milli seconds behind. A lag time " - + "which exceeds an acceptable lagtime of " + acceptableLagTime + "ms indicates " - + "that the registry server is under load or was in sleep mode. We are therefore skipping this notification " - + "cycle."); - return false; - } - } - protected GetSubscriptionResults buildGetSubscriptionResults(Subscription subscription, Date endPoint) - throws DispositionReportFaultMessage, DatatypeConfigurationException { - - GetSubscriptionResults getSubscriptionResults = null; - Duration duration = TypeConvertor.convertStringToDuration(subscription.getNotificationInterval()); - Date startPoint = subscription.getLastNotified(); - Date nextDesiredNotificationDate = null; - if (startPoint==null) startPoint = subscription.getCreateDate(); - nextDesiredNotificationDate = new Date(startPoint.getTime()); - duration.addTo(nextDesiredNotificationDate); + + } + } catch (Exception e) { + log.error("Could not obtain subscriptionResult for subscriptionKey " + + subscription.getSubscriptionKey() + ". " + e.getMessage(), e); + } + } else { + // the subscription expired, we should delete it + log.info("Subcription with key " + subscription.getSubscriptionKey() + + " expired " + subscription.getExpiresAfter()); + deleteSubscription(subscription); + } + } + long endTime = System.currentTimeMillis(); + + if ((endTime - startTime) > interval) { + log.info("Notification background task duration exceeds the JUDDI_NOTIFICATION_INTERVAL" + + " of " + interval + ". Notification background task took " + + (endTime - startTime) + " milliseconds."); + } else { + log.info("Notification background task took " + (endTime - startTime) + " milliseconds."); + } + } else { + log.debug("Skipping current notification cycle because lagtime is too great."); + } + } + + /** + * Checks to see that the event are fired on time. If they are late this + * may indicate that the server is under load. The acceptableLagTime is + * configurable using the "juddi.notification.acceptable.lagtime" + * property and is defaulted to 500ms. A negative value means that you + * do not care about the lag time and you simply always want to go do + * the notification work. + * + * @param scheduleExecutionTime + * @return true if the server is within the acceptable latency lag. + */ + private boolean firedOnTime(long scheduleExecutionTime) { + long lagTime = System.currentTimeMillis() - scheduleExecutionTime; + if (lagTime <= acceptableLagTime || acceptableLagTime < 0) { + return true; + } else { + log.debug("NotificationTimer is lagging " + lagTime + " milli seconds behind. A lag time " + + "which exceeds an acceptable lagtime of " + acceptableLagTime + "ms indicates " + + "that the registry server is under load or was in sleep mode. We are therefore skipping this notification " + + "cycle."); + return false; + } + } + + protected GetSubscriptionResults buildGetSubscriptionResults(Subscription subscription, Date endPoint) + throws DispositionReportFaultMessage, DatatypeConfigurationException { + + GetSubscriptionResults getSubscriptionResults = null; + Duration duration = TypeConvertor.convertStringToDuration(subscription.getNotificationInterval()); + Date startPoint = subscription.getLastNotified(); + Date nextDesiredNotificationDate = null; + if (startPoint == null) { + startPoint = subscription.getCreateDate(); + } + nextDesiredNotificationDate = new Date(startPoint.getTime()); + duration.addTo(nextDesiredNotificationDate); //nextDesiredNotificationDate = lastTime + the Interval Duration, which should be: - //AFTER the lastNotified time and BEFORE the endTime (current time). If it is - //after the endTime, then the user does not want a notification yet, so we accumulate. - if (subscription.getLastNotified()==null || nextDesiredNotificationDate.after(startPoint) && nextDesiredNotificationDate.before(endPoint)) { - getSubscriptionResults = new GetSubscriptionResults(); - CoveragePeriod period = new CoveragePeriod(); - GregorianCalendar calendar = new GregorianCalendar(); - calendar.setTimeInMillis(startPoint.getTime()); - period.setStartPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar)); - calendar.setTimeInMillis(endPoint.getTime()); - period.setEndPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar)); - if (log.isDebugEnabled()) log.debug("Period " + period.getStartPoint() + " " + period.getEndPoint()); - getSubscriptionResults.setCoveragePeriod(period); - } else { - log.info("Client does not yet want a notification. The next desidered notification Date " + nextDesiredNotificationDate + ". The current interval [" - + startPoint + " , " + endPoint + "] therefore skipping this notification cycle."); - if (desiredDate==null || nextDesiredNotificationDate.getTime() < desiredDate.getTime()) { - desiredDate = nextDesiredNotificationDate; - } - } - return getSubscriptionResults; - - } - - protected boolean resultListContainsChanges(SubscriptionResultsList resultList) - { - if (resultList==null) return false; - if (resultList.getBindingDetail() !=null || resultList.getBusinessDetail()!=null - || resultList.getBusinessList() !=null || resultList.getServiceDetail() !=null - || resultList.getServiceList() !=null || resultList.getTModelDetail() !=null - || resultList.getTModelList() !=null || resultList.getRelatedBusinessesList() !=null) { - return true; - } - //When the response is 'brief', or when there are deleted only keyBags are used. - if (resultList.getKeyBag()!=null && resultList.getKeyBag().size() > 0) return true; - //there are no changes to what was subscribed to - return false; - } - /** - * Obtains all subscriptions in the system. - * @return Collection of All Subscriptions in the system. - */ - @SuppressWarnings("unchecked") - protected Collection<Subscription> getAllAsyncSubscriptions() { - Collection<Subscription> subscriptions = null; - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - try { - tx.begin(); - Query query = em.createQuery("SELECT s FROM Subscription s WHERE s.bindingKey IS NOT NULL"); - subscriptions = (Collection<Subscription>) query.getResultList(); - tx.commit(); - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); - } - return subscriptions; - } - /** - * Deletes the subscription. i.e. when it is expired. - * @param subscription - */ - protected void deleteSubscription(Subscription subscription) { - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - try { - tx.begin(); - em.remove(subscription); - tx.commit(); - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); - } - } - /** - * Sends out the notifications. + //AFTER the lastNotified time and BEFORE the endTime (current time). If it is + //after the endTime, then the user does not want a notification yet, so we accumulate. + if (subscription.getLastNotified() == null || nextDesiredNotificationDate.after(startPoint) && nextDesiredNotificationDate.before(endPoint)) { + getSubscriptionResults = new GetSubscriptionResults(); + CoveragePeriod period = new CoveragePeriod(); + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTimeInMillis(startPoint.getTime()); + period.setStartPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar)); + calendar.setTimeInMillis(endPoint.getTime()); + period.setEndPoint(DatatypeFactory.newInstance().newXMLGregorianCalendar(calendar)); + if (log.isDebugEnabled()) { + log.debug("Period " + period.getStartPoint() + " " + period.getEndPoint()); + } + getSubscriptionResults.setCoveragePeriod(period); + } else { + log.info("Client does not yet want a notification. The next desidered notification Date " + nextDesiredNotificationDate + ". The current interval [" + + startPoint + " , " + endPoint + "] therefore skipping this notification cycle."); + if (desiredDate == null || nextDesiredNotificationDate.getTime() < desiredDate.getTime()) { + desiredDate = nextDesiredNotificationDate; + } + } + return getSubscriptionResults; + + } + + protected boolean resultListContainsChanges(SubscriptionResultsList resultList) { + if (resultList == null) { + return false; + } + if (resultList.getBindingDetail() != null || resultList.getBusinessDetail() != null + || resultList.getBusinessList() != null || resultList.getServiceDetail() != null + || resultList.getServiceList() != null || resultList.getTModelDetail() != null + || resultList.getTModelList() != null || resultList.getRelatedBusinessesList() != null) { + return true; + } + //When the response is 'brief', or when there are deleted only keyBags are used. + if (resultList.getKeyBag() != null && resultList.getKeyBag().size() > 0) { + return true; + } + //there are no changes to what was subscribed to + return false; + } + + /** + * Obtains all subscriptions in the system. + * + * @return Collection of All Subscriptions in the system. + */ + @SuppressWarnings("unchecked") + protected Collection<Subscription> getAllAsyncSubscriptions() { + Collection<Subscription> subscriptions = null; + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + tx.begin(); + Query query = em.createQuery("SELECT s FROM Subscription s WHERE s.bindingKey IS NOT NULL"); + subscriptions = (Collection<Subscription>) query.getResultList(); + tx.commit(); + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } + return subscriptions; + } + + /** + * Deletes the subscription. i.e. when it is expired. + * + * @param subscription + */ + protected void deleteSubscription(Subscription subscription) { + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + tx.begin(); + em.remove(subscription); + tx.commit(); + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } + } + + /** + * Sends out the notifications. + * * @param getSubscriptionResults - * @param resultList - * @param notificationDate - */ - protected void notify(GetSubscriptionResults getSubscriptionResults, SubscriptionResultsList resultList, Date notificationDate) - { - EntityManager em = PersistenceManager.getEntityManager(); - EntityTransaction tx = em.getTransaction(); - try { - String subscriptionKey = resultList.getSubscription().getSubscriptionKey(); - org.apache.juddi.model.Subscription modelSubscription = - em.find(org.apache.juddi.model.Subscription.class, subscriptionKey); - Date lastNotifiedDate = modelSubscription.getLastNotified(); - //now log to the db that we are sending the notification. - tx.begin(); - modelSubscription.setLastNotified(notificationDate); - em.persist(modelSubscription); - tx.commit(); - - org.apache.juddi.model.BindingTemplate bindingTemplate= em.find(org.apache.juddi.model.BindingTemplate.class, modelSubscription.getBindingKey()); - NotifySubscriptionListener body = new NotifySubscriptionListener(); + * @param resultList + * @param notificationDate + */ + protected void notify(GetSubscriptionResults getSubscriptionResults, SubscriptionResultsList resultList, Date notificationDate) { + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + String subscriptionKey = resultList.getSubscription().getSubscriptionKey(); + org.apache.juddi.model.Subscription modelSubscription + = em.find(org.apache.juddi.model.Subscription.class, subscriptionKey); + Date lastNotifiedDate = modelSubscription.getLastNotified(); + //now log to the db that we are sending the notification. + tx.begin(); + modelSubscription.setLastNotified(notificationDate); + em.persist(modelSubscription); + tx.commit(); + + org.apache.juddi.model.BindingTemplate bindingTemplate = em.find(org.apache.juddi.model.BindingTemplate.class, modelSubscription.getBindingKey()); + NotifySubscriptionListener body = new NotifySubscriptionListener(); // if (resultList.getServiceList()!=null && resultList.getServiceList().getServiceInfos()!=null && // resultList.getServiceList().getServiceInfos().getServiceInfo().size() == 0) { // resultList.getServiceList().setServiceInfos(null); // } - body.setSubscriptionResultsList(resultList); - + body.setSubscriptionResultsList(resultList); + //TODO if the endpoint requires an auth token, look up the security endpoint of the remote registry //via ClientSubscriptionInfo - - if (sendToken) - { - String authorizedName = modelSubscription.getAuthorizedName(); - UDDISecurityImpl security = new UDDISecurityImpl(); + if (sendToken) { + String authorizedName = modelSubscription.getAuthorizedName(); + UDDISecurityImpl security = new UDDISecurityImpl(); - if (authorizedName != null) { // add a security token if needed - try { - //obtain a token for this publisher - org.uddi.api_v3.AuthToken token = security.getAuthToken(authorizedName); - body.setAuthInfo(token.getAuthInfo()); - } catch (DispositionReportFaultMessage e) { - body.setAuthInfo("Failed to generate token, please contact UDDI admin"); - log.error(e.getMessage(),e); - } - } - } - - if (bindingTemplate!=null) { - if (AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType()) || - AccessPointType.WSDL_DEPLOYMENT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) { - try { - Notifier notifier = new NotifierFactory().getNotifier(bindingTemplate); - if (notifier!=null) { - log.info("Sending out notification to " + bindingTemplate.getAccessPointUrl()); - notifier.notifySubscriptionListener(body); - //there maybe more chunks we have to send - String chunkToken=body.getSubscriptionResultsList().getChunkToken(); - while(chunkToken!=null) { - UddiEntityPublisher publisher = new UddiEntityPublisher(); - publisher.setAuthorizedName(modelSubscription.getAuthorizedName()); - log.debug("Sending out next chunk: " + chunkToken + " to " + bindingTemplate.getAccessPointUrl()); - getSubscriptionResults.setChunkToken(chunkToken); - resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); - body.setSubscriptionResultsList(resultList); - if (resultListContainsChanges(resultList)) - //if (!IsEmpty(resultList)) - notifier.notifySubscriptionListener(body); - chunkToken=body.getSubscriptionResultsList().getChunkToken(); - } - //successful notification so remove from the badNotificationList - if (badNotifications.containsKey(resultList.getSubscription().getSubscriptionKey())) - badNotifications.remove(resultList.getSubscription().getSubscriptionKey()); - } + if (authorizedName != null) { // add a security token if needed + try { + //obtain a token for this publisher + org.uddi.api_v3.AuthToken token = security.getAuthToken(authorizedName); + body.setAuthInfo(token.getAuthInfo()); + } catch (DispositionReportFaultMessage e) { + body.setAuthInfo("Failed to generate token, please contact UDDI admin"); + log.error(e.getMessage(), e); } - catch (Exception e) { + } + } + + if (bindingTemplate != null) { + if (AccessPointType.END_POINT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType()) + || AccessPointType.WSDL_DEPLOYMENT.toString().equalsIgnoreCase(bindingTemplate.getAccessPointType())) { + try { + Notifier notifier = new NotifierFactory().getNotifier(bindingTemplate); + if (notifier != null) { + log.info("Sending out notification to " + bindingTemplate.getAccessPointUrl()); + notifier.notifySubscriptionListener(body); + //there maybe more chunks we have to send + String chunkToken = body.getSubscriptionResultsList().getChunkToken(); + while (chunkToken != null) { + UddiEntityPublisher publisher = new UddiEntityPublisher(); + publisher.setAuthorizedName(modelSubscription.getAuthorizedName()); + log.debug("Sending out next chunk: " + chunkToken + " to " + bindingTemplate.getAccessPointUrl()); + getSubscriptionResults.setChunkToken(chunkToken); + resultList = subscriptionImpl.getSubscriptionResults(getSubscriptionResults, publisher); + body.setSubscriptionResultsList(resultList); + if (resultListContainsChanges(resultList)) //if (!IsEmpty(resultList)) + { + notifier.notifySubscriptionListener(body); + } + chunkToken = body.getSubscriptionResultsList().getChunkToken(); + } + //successful notification so remove from the badNotificationList + if (badNotifications.containsKey(resultList.getSubscription().getSubscriptionKey())) { + badNotifications.remove(resultList.getSubscription().getSubscriptionKey()); + } + } + } catch (Exception e) { if (e.getCause() instanceof IOException) { - addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointUrl()); - //we could not notify so compensate the transaction above - modelSubscription.setLastNotified(lastNotifiedDate); - tx.begin(); - em.persist(modelSubscription); - tx.commit(); + addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointUrl()); + //we could not notify so compensate the transaction above + modelSubscription.setLastNotified(lastNotifiedDate); + tx.begin(); + em.persist(modelSubscription); + tx.commit(); //} else { - //log.warn("Unexpected WebServiceException " + e.getMessage() + e.getCause()); - } - log.error("Unexpected notification exception:" +e.getClass().getCanonicalName() + " " + e.getMessage() +" "+ e.getCause()); - log.debug("Unexpected notification exception:" +e.getClass().getCanonicalName() + " "+ e.getMessage() + " "+e.getCause(),e); - } - } else { - log.info("Binding " + bindingTemplate.getEntityKey() + " has an unsupported binding type of " - + bindingTemplate.getAccessPointType() + ". Only " - + AccessPointType.END_POINT.toString() + " and " - + AccessPointType.WSDL_DEPLOYMENT.toString() + " are supported."); - addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointType() + " not supported"); - } - } else { - log.info("There is no valid binding template defined for this subscription: " + modelSubscription.getBindingKey()); - addBadNotificationToList(subscriptionKey, modelSubscription.getBindingKey() + " not found"); - } - - } finally { - if (tx.isActive()) { - tx.rollback(); - } - em.close(); - } - } + //log.warn("Unexpected WebServiceException " + e.getMessage() + e.getCause()); + } + log.error("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause()); + log.debug("Unexpected notification exception:" + e.getClass().getCanonicalName() + " " + e.getMessage() + " " + e.getCause(), e); + } + } else { + log.info("Binding " + bindingTemplate.getEntityKey() + " has an unsupported binding type of " + + bindingTemplate.getAccessPointType() + ". Only " + + AccessPointType.END_POINT.toString() + " and " + + AccessPointType.WSDL_DEPLOYMENT.toString() + " are supported."); + addBadNotificationToList(subscriptionKey, bindingTemplate.getAccessPointType() + " not supported"); + } + } else { + log.info("There is no valid binding template defined for this subscription: " + modelSubscription.getBindingKey()); + addBadNotificationToList(subscriptionKey, modelSubscription.getBindingKey() + " not found"); + } + + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } + } + + protected UDDISubscriptionImpl getSubscriptionImpl() { + return subscriptionImpl; + } + + private boolean isTemporarilyDisabled(String subscriptionKey) { + if (maxTries > 0 && badNotifications.containsKey(subscriptionKey) && badNotifications.get(subscriptionKey) > maxTries) { + log.debug("Subscription " + subscriptionKey + " is temperarily disabled. The notification endpoint" + + " could not be reached more then " + maxTries + " times"); + return true; + } + return false; + } - protected UDDISubscriptionImpl getSubscriptionImpl() { - return subscriptionImpl; - } - - private boolean isTemporarilyDisabled(String subscriptionKey) { - if (maxTries > 0 && badNotifications.containsKey(subscriptionKey) && badNotifications.get(subscriptionKey) > maxTries ) { - log.debug("Subscription " + subscriptionKey + " is temperarily disabled. The notification endpoint" + - " could not be reached more then " + maxTries + " times"); - return true; - } - return false; - } - - private int addBadNotificationToList(String subscriptionKey, String endPoint) { - Integer numberOfBadNotifications = 0; - if (badNotifications.containsKey(subscriptionKey)) - numberOfBadNotifications = badNotifications.get(subscriptionKey); - badNotifications.put(subscriptionKey, ++numberOfBadNotifications); - log.debug("bad notification number " + numberOfBadNotifications + " for subscription " - + subscriptionKey + " " + endPoint); - return numberOfBadNotifications; - } + private int addBadNotificationToList(String subscriptionKey, String endPoint) { + Integer numberOfBadNotifications = 0; + if (badNotifications.containsKey(subscriptionKey)) { + numberOfBadNotifications = badNotifications.get(subscriptionKey); + } + badNotifications.put(subscriptionKey, ++numberOfBadNotifications); + log.debug("bad notification number " + numberOfBadNotifications + " for subscription " + + subscriptionKey + " " + endPoint); + return numberOfBadNotifications; + } } http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/test/java/org/apache/juddi/api/impl/API_160_ReplicationTest.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/test/java/org/apache/juddi/api/impl/API_160_ReplicationTest.java b/juddi-core/src/test/java/org/apache/juddi/api/impl/API_160_ReplicationTest.java index 98bdb0b..700c96d 100644 --- a/juddi-core/src/test/java/org/apache/juddi/api/impl/API_160_ReplicationTest.java +++ b/juddi-core/src/test/java/org/apache/juddi/api/impl/API_160_ReplicationTest.java @@ -48,6 +48,7 @@ import org.uddi.repl_v3.ChangeRecord; import org.uddi.repl_v3.ChangeRecordIDType; import org.uddi.repl_v3.CommunicationGraph; import org.uddi.repl_v3.DoPing; +import org.uddi.repl_v3.GetChangeRecords; import org.uddi.repl_v3.HighWaterMarkVectorType; import org.uddi.repl_v3.Operator; import org.uddi.repl_v3.OperatorStatusType; @@ -126,7 +127,12 @@ public class API_160_ReplicationTest { HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType(); highWaterMarkVectorType.getHighWaterMark().add(highWaterMarks.get(0)); - List<ChangeRecord> changeRecords = repl.getChangeRecords("test", null, BigInteger.valueOf(highWaterMarks.get(0).getOriginatingUSN()), highWaterMarkVectorType); + GetChangeRecords req = new GetChangeRecords(); + req.setRequestingNode("test"); + req.setChangesAlreadySeen(null); + + req.setResponseLimitVector(highWaterMarkVectorType); + repl.getChangeRecords(req);//"test", null, BigInteger.valueOf(highWaterMarks.get(0).getOriginatingUSN()), highWaterMarkVectorType); Assert.fail("unexpected success"); } http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/test/java/org/apache/juddi/api/runtime/CLIServerTest.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/test/java/org/apache/juddi/api/runtime/CLIServerTest.java b/juddi-core/src/test/java/org/apache/juddi/api/runtime/CLIServerTest.java index 4c172b9..dbb5bba 100644 --- a/juddi-core/src/test/java/org/apache/juddi/api/runtime/CLIServerTest.java +++ b/juddi-core/src/test/java/org/apache/juddi/api/runtime/CLIServerTest.java @@ -34,6 +34,7 @@ import org.uddi.api_v3.Contact; import org.uddi.api_v3.PersonName; import org.uddi.repl_v3.CommunicationGraph; import org.uddi.repl_v3.DoPing; +import org.uddi.repl_v3.GetChangeRecords; /** * @@ -101,7 +102,7 @@ public class CLIServerTest { UDDIReplicationPortType juddiApiService = new UDDIService().getUDDIReplicationPort(); ((BindingProvider) juddiApiService).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, replUrl); - juddiApiService.getChangeRecords(null, new HighWaterMarkVectorType(), BigInteger.ONE, new HighWaterMarkVectorType()); + juddiApiService.getChangeRecords(new GetChangeRecords()); Assert.assertTrue(sink); sink = false; } http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-core/src/test/java/org/apache/juddi/api/runtime/replicantImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/test/java/org/apache/juddi/api/runtime/replicantImpl.java b/juddi-core/src/test/java/org/apache/juddi/api/runtime/replicantImpl.java index f984747..7d05569 100644 --- a/juddi-core/src/test/java/org/apache/juddi/api/runtime/replicantImpl.java +++ b/juddi-core/src/test/java/org/apache/juddi/api/runtime/replicantImpl.java @@ -21,7 +21,9 @@ import java.util.List; import javax.jws.WebService; import org.uddi.repl_v3.ChangeRecord; import org.uddi.repl_v3.ChangeRecordIDType; +import org.uddi.repl_v3.ChangeRecords; import org.uddi.repl_v3.DoPing; +import org.uddi.repl_v3.GetChangeRecords; import org.uddi.repl_v3.HighWaterMarkVectorType; import org.uddi.repl_v3.NotifyChangeRecordsAvailable; import org.uddi.repl_v3.TransferCustody; @@ -39,11 +41,7 @@ import org.uddi.v3_service.UDDIReplicationPortType; public replicantImpl(){ } - @Override - public List<ChangeRecord> getChangeRecords(String requestingNode, HighWaterMarkVectorType changesAlreadySeen, BigInteger responseLimitCount, HighWaterMarkVectorType responseLimitVector) throws DispositionReportFaultMessage, RemoteException { - CLIServerTest.sink = true; - return null; - } + @Override public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body) throws DispositionReportFaultMessage, RemoteException { @@ -66,5 +64,11 @@ import org.uddi.v3_service.UDDIReplicationPortType; public void transferCustody(TransferCustody body) throws DispositionReportFaultMessage, RemoteException { CLIServerTest.sink = true; } + + @Override + public ChangeRecords getChangeRecords(GetChangeRecords body) throws DispositionReportFaultMessage, RemoteException { + CLIServerTest.sink = true; + return null; + } } http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/juddi-examples/more-uddi-samples/src/main/java/org/apache/juddi/samples/UddiReplication.java ---------------------------------------------------------------------- diff --git a/juddi-examples/more-uddi-samples/src/main/java/org/apache/juddi/samples/UddiReplication.java b/juddi-examples/more-uddi-samples/src/main/java/org/apache/juddi/samples/UddiReplication.java index 2e62348..ea6fc52 100644 --- a/juddi-examples/more-uddi-samples/src/main/java/org/apache/juddi/samples/UddiReplication.java +++ b/juddi-examples/more-uddi-samples/src/main/java/org/apache/juddi/samples/UddiReplication.java @@ -20,11 +20,14 @@ import java.rmi.RemoteException; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import javax.xml.bind.JAXB; import javax.xml.ws.BindingProvider; import org.apache.juddi.v3.client.UDDIService; import org.uddi.repl_v3.ChangeRecord; import org.uddi.repl_v3.ChangeRecordIDType; +import org.uddi.repl_v3.ChangeRecords; import org.uddi.repl_v3.DoPing; +import org.uddi.repl_v3.GetChangeRecords; import org.uddi.repl_v3.HighWaterMarkVectorType; import org.uddi.v3_service.UDDIReplicationPortType; @@ -75,7 +78,12 @@ class UddiReplication { highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(DoPing(key2), record)); ((BindingProvider) uddiReplicationPort).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, key2); - List<ChangeRecord> changeRecords = uddiReplicationPort.getChangeRecords(sourcenode, highWaterMarkVectorType, BigInteger.valueOf(100), null); + GetChangeRecords req = new GetChangeRecords(); + req.setRequestingNode(sourcenode); + req.setChangesAlreadySeen(highWaterMarkVectorType); + req.setResponseLimitCount(BigInteger.valueOf(100)); + ChangeRecords res = uddiReplicationPort.getChangeRecords(req); + List<ChangeRecord> changeRecords = res.getChangeRecord(); System.out.println("Success...." + changeRecords.size() + " records returned"); System.out.println("Node, USN, type"); for (int i = 0; i < changeRecords.size(); i++) { @@ -83,6 +91,7 @@ class UddiReplication { changeRecords.get(i).getChangeID().getNodeID() + ", " + changeRecords.get(i).getChangeID().getOriginatingUSN() + ": " + GetChangeType(changeRecords.get(i))); + JAXB.marshal(changeRecords.get(i), System.out); } } catch (Exception ex) { Logger.getLogger(UddiReplication.class.getName()).log(Level.SEVERE, null, ex); http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/uddi-ws/pom.xml ---------------------------------------------------------------------- diff --git a/uddi-ws/pom.xml b/uddi-ws/pom.xml index 14f53a4..e63f59b 100644 --- a/uddi-ws/pom.xml +++ b/uddi-ws/pom.xml @@ -61,5 +61,11 @@ <type>jar</type> <scope>provided</scope> </dependency> + <dependency> + <scope>test</scope> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-databinding-jaxb</artifactId> + <version>${cxf.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/uddi-ws/src/main/java/org/uddi/repl_v3/ChangeRecordDelete.java ---------------------------------------------------------------------- diff --git a/uddi-ws/src/main/java/org/uddi/repl_v3/ChangeRecordDelete.java b/uddi-ws/src/main/java/org/uddi/repl_v3/ChangeRecordDelete.java index 3e70b67..d7fa1c7 100644 --- a/uddi-ws/src/main/java/org/uddi/repl_v3/ChangeRecordDelete.java +++ b/uddi-ws/src/main/java/org/uddi/repl_v3/ChangeRecordDelete.java @@ -66,13 +66,13 @@ The changeRecordDelete MUST contain a modified timestamp to allow multi-node reg public class ChangeRecordDelete implements Serializable{ @XmlTransient private static final long serialVersionUID = -7081596275330679517L; - @XmlElement(namespace = "urn:uddi-org:api_v3") + @XmlElement(namespace = "urn:uddi-org:repl_v3") protected String businessKey; - @XmlElement(namespace = "urn:uddi-org:api_v3") + @XmlElement(namespace = "urn:uddi-org:repl_v3") protected String tModelKey; - @XmlElement(namespace = "urn:uddi-org:api_v3") + @XmlElement(namespace = "urn:uddi-org:repl_v3") protected String serviceKey; - @XmlElement(namespace = "urn:uddi-org:api_v3") + @XmlElement(namespace = "urn:uddi-org:repl_v3") protected String bindingKey; @XmlElement(required = true) protected XMLGregorianCalendar modified; http://git-wip-us.apache.org/repos/asf/juddi/blob/939ae47f/uddi-ws/src/main/java/org/uddi/v3_service/UDDIReplicationPortType.java ---------------------------------------------------------------------- diff --git a/uddi-ws/src/main/java/org/uddi/v3_service/UDDIReplicationPortType.java b/uddi-ws/src/main/java/org/uddi/v3_service/UDDIReplicationPortType.java index 2dda681..684c7a3 100644 --- a/uddi-ws/src/main/java/org/uddi/v3_service/UDDIReplicationPortType.java +++ b/uddi-ws/src/main/java/org/uddi/v3_service/UDDIReplicationPortType.java @@ -155,6 +155,14 @@ public interface UDDIReplicationPortType extends Remote { * code. Error reporting SHALL be that specified by Section 4.8 â Success * and Error Reporting of this specification. */ + @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE) + @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body") + @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords") + public org.uddi.repl_v3.ChangeRecords getChangeRecords( + @WebParam(partName = "body", name = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3") + org.uddi.repl_v3.GetChangeRecords body + ) throws DispositionReportFaultMessage,RemoteException; + /* @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords") @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3") @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords") @@ -169,7 +177,7 @@ public interface UDDIReplicationPortType extends Remote { @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector) throws DispositionReportFaultMessage, RemoteException; - +*/ /** * <p class="MsoBodyText">Nodes can inform other nodes that they have new * change records available for consumption by replication by using this --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
