Repository: juddi Updated Branches: refs/heads/master 53d6f2d2c -> ba85baa1c
JUDDI-241 revising the validation logic for inbound replication data. this prevents accidental and malicious custody transfers while still allowing valid transfers to occur, also prevents malicious updates Project: http://git-wip-us.apache.org/repos/asf/juddi/repo Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/ba85baa1 Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/ba85baa1 Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/ba85baa1 Branch: refs/heads/master Commit: ba85baa1ced45e28f13c5060a72a8bac30ba8c73 Parents: 53d6f2d Author: Alex <[email protected]> Authored: Sun Jan 18 16:12:27 2015 -0500 Committer: Alex <[email protected]> Committed: Sun Jan 18 16:12:27 2015 -0500 ---------------------------------------------------------------------- .../juddi/api/impl/UDDICustodyTransferImpl.java | 4 +- .../juddi/api/impl/UDDIPublicationImpl.java | 47 ++- .../juddi/api/impl/UDDIReplicationImpl.java | 317 ++++++++++++------- .../java/org/apache/juddi/model/UddiEntity.java | 10 + 4 files changed, 267 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java index 352123c..3421d81 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java @@ -261,8 +261,10 @@ public class UDDICustodyTransferImpl extends AuthenticatedService implements UDD UddiEntity uddiEntity = em.find(UddiEntity.class, key); if (uddiEntity!=null) { + uddiEntity.setIsTransferInProgress(true); sourceNode = uddiEntity.getNodeId(); - break; //we only need one source node + em.merge(uddiEntity); + //save the fact we are expecting a transfer } else { http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java index 2417db9..a62e1c6 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java @@ -80,12 +80,14 @@ import org.uddi.api_v3.SaveBusiness; import org.uddi.api_v3.SaveService; import org.uddi.api_v3.SaveTModel; import org.uddi.api_v3.ServiceDetail; +import org.uddi.api_v3.TModel; import org.uddi.api_v3.TModelDetail; import org.uddi.repl_v3.ChangeRecordDelete; import org.uddi.repl_v3.ChangeRecordDeleteAssertion; import org.uddi.repl_v3.ChangeRecordHide; import org.uddi.repl_v3.ChangeRecordIDType; import org.uddi.repl_v3.ChangeRecordNewData; +import org.uddi.repl_v3.ChangeRecordNewDataConditional; import org.uddi.repl_v3.ChangeRecordPublisherAssertion; import org.uddi.v3_service.DispositionReportFaultMessage; import org.uddi.v3_service.UDDIPublicationPortType; @@ -157,7 +159,7 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub em.merge(existingPubAssertion); persistNewAssertion = false; - changes.add(getChangeRecord_deletePublisherAssertion(apiPubAssertion, node, existingPubAssertion.getToCheck().equalsIgnoreCase("false"), existingPubAssertion.getFromCheck().equalsIgnoreCase("false"), System.currentTimeMillis())); + changes.add(getChangeRecord_deletePublisherAssertion(apiPubAssertion, node, existingPubAssertion.getToCheck().equalsIgnoreCase("false"), existingPubAssertion.getFromCheck().equalsIgnoreCase("false"), System.currentTimeMillis())); } else { // Otherwise, it is a new relationship between these entities. Remove the old one so the new one can be added. // TODO: the model only seems to allow one assertion per two business (primary key is fromKey and toKey). Spec seems to imply as @@ -343,8 +345,9 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub org.apache.juddi.model.PublisherAssertion existingPubAssertion = em.find(org.apache.juddi.model.PublisherAssertion.class, modelPubAssertion.getId()); - if (existingPubAssertion==null) + if (existingPubAssertion == null) { throw new InvalidValueException(new ErrorMessage("E_assertionNotFound")); + } boolean fromkey = publisher.isOwner(em.find(BusinessEntity.class, entity.getFromKey())); boolean tokey = publisher.isOwner(em.find(BusinessEntity.class, entity.getToKey())); @@ -362,7 +365,7 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub em.persist(existingPubAssertion); } - changes.add(getChangeRecord_deletePublisherAssertion(entity, node, tokey,fromkey, existingPubAssertion.getModified().getTime())); + changes.add(getChangeRecord_deletePublisherAssertion(entity, node, tokey, fromkey, existingPubAssertion.getModified().getTime())); } tx.commit(); @@ -924,6 +927,8 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub List<ChangeRecord> changes = new ArrayList<ChangeRecord>(); for (org.uddi.api_v3.TModel apiTModel : apiTModelList) { + // Object obj=em.find( org.apache.juddi.model.Tmodel.class, apiTModel.getTModelKey()); + //just making changes to an existing tModel, no worries org.apache.juddi.model.Tmodel modelTModel = new org.apache.juddi.model.Tmodel(); MappingApiToModel.mapTModel(apiTModel, modelTModel); @@ -934,6 +939,15 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub result.getTModel().add(apiTModel); changes.add(getChangeRecord(modelTModel, apiTModel, node)); + /* + //TODO JUDDI-915 + if (obj != null) { + + changes.add(getChangeRecord(modelTModel, apiTModel, node)); + } else { + //special case for replication, must setup a new data conditional change record + changes.add(getChangeRecordConditional(modelTModel, apiTModel, node)); + }*/ } @@ -1533,4 +1547,31 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub return ret; } + private static ChangeRecord getChangeRecordConditional(Tmodel modelTModel, TModel apiTModel, String node) throws DispositionReportFaultMessage { + ChangeRecord cr = new ChangeRecord(); + if (!apiTModel.getTModelKey().equals(modelTModel.getEntityKey())) { + throw new FatalErrorException(new ErrorMessage("E_fatalError", "the model and api keys do not match when saving a tmodel!")); + } + cr.setEntityKey(modelTModel.getEntityKey()); + cr.setNodeID(node); + + cr.setRecordType(ChangeRecord.RecordType.ChangeRecordNewDataConditional); + org.uddi.repl_v3.ChangeRecord crapi = new org.uddi.repl_v3.ChangeRecord(); + crapi.setChangeID(new ChangeRecordIDType(node, -1L)); + crapi.setChangeRecordNewDataConditional(new ChangeRecordNewDataConditional()); + crapi.getChangeRecordNewDataConditional().setChangeRecordNewData(new ChangeRecordNewData()); + crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().setTModel(apiTModel); + crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().setTModelKey(modelTModel.getEntityKey()); + crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().setOperationalInfo(new OperationalInfo()); + MappingModelToApi.mapOperationalInfo(modelTModel, crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); + StringWriter sw = new StringWriter(); + JAXB.marshal(crapi, sw); + try { + cr.setContents(sw.toString().getBytes("UTF8")); + } catch (UnsupportedEncodingException ex) { + logger.error(ex); + } + return cr; + } + } http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java index 8f20593..703f58c 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java @@ -58,6 +58,7 @@ import org.apache.juddi.model.Operator; import org.apache.juddi.model.PublisherAssertion; import org.apache.juddi.model.PublisherAssertionId; import org.apache.juddi.model.Tmodel; +import org.apache.juddi.model.UddiEntity; import org.apache.juddi.replication.ReplicationNotifier; import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges; import org.apache.juddi.v3.client.UDDIService; @@ -109,13 +110,13 @@ import org.uddi.v3_service.UDDIReplicationPortType; org.uddi.policy_v3_instanceparms.ObjectFactory.class }) public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType { - + static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) { //if the config is different Set<String> oldnodes = getNodes(oldConfig); Set<String> newNodes = getNodes(newConfig); - + Set<String> addedNodes = diffNodeList(oldnodes, newNodes); if (queue == null) { queue = new ConcurrentLinkedQueue<String>(); @@ -128,9 +129,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep queue.add(s); } } - + } - + private static Set<String> getNodes(ReplicationConfiguration oldConfig) { Set<String> ret = new HashSet<String>(); if (oldConfig == null) { @@ -169,19 +170,19 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (!found) { diff.add(lhs); } - + } return diff; } - + private UDDIServiceCounter serviceCounter; - + private static PullTimerTask timer = null; private long startBuffer = 5000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default private long interval = 5000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default private static UDDIPublicationImpl pub = null; - + public UDDIReplicationImpl() { super(); if (pub == null) { @@ -195,17 +196,17 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } catch (ConfigurationException ex) { logger.fatal(ex); } - + } - + private synchronized void Init() { if (queue == null) { queue = new ConcurrentLinkedQueue<String>(); } timer = new PullTimerTask(); - + } - + private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) { if (changesAlreadySeen != null) { for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { @@ -223,18 +224,18 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep * available */ private class PullTimerTask extends TimerTask { - + private Timer timer = null; - + public PullTimerTask() { super(); timer = new Timer(true); timer.scheduleAtFixedRate(this, startBuffer, interval); } - + @Override public void run() { - + if (!queue.isEmpty()) { logger.info("Replication change puller thread started. Queue size: " + queue.size()); } @@ -260,10 +261,10 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep GetChangeRecords body = new GetChangeRecords(); body.setRequestingNode(node); body.setResponseLimitCount(BigInteger.valueOf(20)); - + body.setChangesAlreadySeen(getLastChangeRecordFrom(poll)); logger.info("fetching updates from " + poll + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + " items still in the queue: " + queue.size()); - + List<ChangeRecord> records = replicationClient.getChangeRecords(body).getChangeRecord(); //ok now we need to persist the change records @@ -283,7 +284,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } } - + @Override public boolean cancel() { timer.cancel(); @@ -301,7 +302,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep return; } logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN()); - + if (rec.getChangeID().getNodeID().equalsIgnoreCase(node)) { logger.info("Just received a change record that i created, ignoring...."); return; @@ -317,7 +318,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep * a USN is less than the USN specified in the * changesAlreadySeen highWaterMarkVector. */ - + try { tx.begin(); //the change record rec must also be persisted!! @@ -337,13 +338,19 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (rec.getChangeRecordDelete() != null) { if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) { //delete a binding template + UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey()); + ValidateNodeIdMisMatches(ue, node); pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em); } if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) { //delete a business + UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBindingKey()); + ValidateNodeIdMisMatches(ue, node); pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em); } if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) { + UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getBindingKey()); + ValidateNodeIdMisMatches(ue, node); //delete a service pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em); } @@ -359,11 +366,13 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep * administrative function to * permanently remove a tModel. */ - Object tm=em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey()); - if (tm!=null) + UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey()); + if (tm != null) { + ValidateNodeIdMisMatches(tm, node); em.remove(tm); - else + } else { logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey()); + } //pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em); } } @@ -384,7 +393,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep throw new Exception("Inbound replication data is missiong node id! Change will not be applied"); } if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(node)) { - logger.warn("Inbound replication data is modifying locally owned data. This is not allowed"); + logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer"); } if (rec.getChangeRecordNewData().getBindingTemplate() != null) { //fetch the binding template if it exists already @@ -397,7 +406,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet"); } else { ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); - + org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey()); if (bt != null) { //ValidateNodeIdMatches(node, bt.getNodeId()); @@ -409,16 +418,44 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); em.persist(bt); } - + } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) { - + BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey()); if (model != null) { - ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); - //em.remove(model); - MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); - MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); - em.merge(model); + if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && !model.getNodeId().equals(node)) { + if (model.getIsTransferInProgress()) { + //allow the transfer + MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); + model.setIsTransferInProgress(false); + em.merge(model); + } else { + //block it, unexpected transfer + throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID()); + } + + } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && model.getNodeId().equals(node)) { + //if destination is here and it's staying here, then this is strange also + //someone else updated one of my records + throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey()); + } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && model.getNodeId().equals(node)) { + //this is also strange, destination is elsewhere however it's owned by me. + throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey()); + + } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && !model.getNodeId().equals(node)) { + //changes on a remote node, for an existing item + MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); + MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); + em.merge(model); + + } + } else { model = new BusinessEntity(); MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); @@ -431,39 +468,78 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (find == null) { logger.error("Replication error, attempting to insert a service where the business doesn't exist yet"); } else { - + org.apache.juddi.model.BusinessService model = null; model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey()); if (model != null) { ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); em.remove(model); } - + model = new org.apache.juddi.model.BusinessService(); MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find); MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo()); - + em.persist(model); } - + } else if (rec.getChangeRecordNewData().getTModel() != null) { - + Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey()); if (model != null) { - ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); - em.remove(model); + //in the case of a transfer + //if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable + //meaning, only accept if i'm expecting a transfer + if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && !model.getNodeId().equals(node)) { + if (model.getIsTransferInProgress()) { + //allow the transfer + em.remove(model); + model = new Tmodel(); + MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + model.setIsTransferInProgress(false); + em.persist(model); + } else { + //block it, unexpected transfer + throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID()); + } + + } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && model.getNodeId().equals(node)) { + //if destination is here and it's staying here, then this is strange also + //someone else updated one of my records + throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey()); + } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && model.getNodeId().equals(node)) { + //this is also strange, destination is elsewhere however it's owned by me. + throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey()); + + } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) + && !model.getNodeId().equals(node)) { + //changes on a remote node, for an existing item + em.remove(model); + model = new Tmodel(); + MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); + + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + + em.persist(model); + + } + } else { + model = new Tmodel(); + MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); + + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); + + em.persist(model); } - model = new Tmodel(); - MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model); - - MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo()); - - em.persist(model); } - + } - + } //</editor-fold> @@ -481,6 +557,8 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (existing == null) { logger.error("Unexpected delete/hide tmodel message received for non existing key " + key); } else { + //no one else can delete/hide my tmodel + ValidateNodeIdMisMatches(existing, node); existing.setDeleted(true); existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime()); @@ -491,26 +569,26 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion"> if (rec.getChangeRecordPublisherAssertion() != null) { - + logger.info("Repl CR Publisher Assertion"); //TODO are publisher assertions owned by a given node? PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()); org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid); if (model != null) { logger.info("Repl CR Publisher Assertion - Existing"); - + if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) { model.setFromCheck("true"); } else { model.setFromCheck("false"); } - + if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) { model.setToCheck("true"); } else { model.setToCheck("false"); } - + model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName()); model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue()); model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey()); @@ -524,20 +602,20 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep em.merge(model); } else { logger.info("Repl CR Publisher Assertion - new PA"); - + model = new PublisherAssertion(); MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model); model.setBusinessEntityByFromKey(null); model.setBusinessEntityByToKey(null); model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey())); model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey())); - + if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) { model.setFromCheck("true"); } else { model.setFromCheck("false"); } - + if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) { model.setToCheck("true"); } else { @@ -557,7 +635,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep ReplicationNotifier.Enqueue(MappingApiToModel.mapChangeRecord(posack)); } if (rec.getChangeRecordNewDataConditional() != null) { - + if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) { throw new Exception("Inbound replication data is missiong node id!"); } @@ -576,11 +654,11 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (model == null) { logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet"); } else { - + org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey()); if (bt != null) { ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId()); - + em.remove(bt); } bt = new BindingTemplate(); @@ -589,9 +667,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep // MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); em.persist(bt); } - + } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) { - + BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey()); if (model != null) { ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); @@ -605,31 +683,31 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); logger.warn("Name size on save is " + model.getBusinessNames().size()); em.persist(model); - + } if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) { BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey()); if (find == null) { logger.error("Replication error, attempting to insert a service where the business doesn't exist yet"); } else { - + org.apache.juddi.model.BusinessService model = null; model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey()); if (model != null) { ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); em.remove(model); } - + model = new org.apache.juddi.model.BusinessService(); MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find); MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); - + em.persist(model); } - + } else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) { - + Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey()); if (model != null) { ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId()); @@ -637,14 +715,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } model = new Tmodel(); MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model); - + MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo()); - + em.persist(model); } - + } - + } if (rec.getChangeRecordNull() != null) { //No action required @@ -659,7 +737,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } tx.commit(); - + } catch (Exception drfm) { logger.warn("Error persisting change record!", drfm); StringWriter sw = new StringWriter(); @@ -672,7 +750,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep em.close(); } } - + private HighWaterMarkVectorType getLastChangeRecordFrom(String notifyingNode) { HighWaterMarkVectorType ret = new HighWaterMarkVectorType(); ChangeRecordIDType cid = new ChangeRecordIDType(); @@ -688,9 +766,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } catch (Exception ex) { logger.info(ex); } - + tx.rollback(); - + } catch (Exception drfm) { logger.warn("error caught fetching newest record from node " + notifyingNode, drfm); } finally { @@ -699,22 +777,47 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } em.close(); } - + ret.getHighWaterMark().add(cid); - + return ret; } + } - // private void ValidateDontChangeMyRecordsAtAnotherNode(String ) - private void ValidateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception { + /** + * used to check for alterations on *this node's data from another node, + * which isn't allowed + * + * @param ue + * @param node + * @throws Exception + */ + private static void ValidateNodeIdMisMatches(UddiEntity ue, String node) throws Exception { + if (ue == null) { + return;//object doesn't exist + } + if (ue.getNodeId().equals(node)) { + throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId()); + } + } + + /** + * use to validate that changed data maintained ownership, except for + * business entities and tmodels since they allow transfer + * + * @param newNodeId + * @param currentOwningNode + * @throws Exception + */ + private static void ValidateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception { if (newNodeId == null || currentOwningNode == null) { throw new Exception("either the local node ID is null or the inbound replication data's node id is null"); } //only time this is allowed is custody transfer if (!newNodeId.equals(currentOwningNode)) { + logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + node); //throw new Exception("node id mismatch!"); - logger.info("AUDIT, custory transfer from node, " + currentOwningNode + " to " + newNodeId); } //if i already have a record and "own it" and the remote node has a record with the same key, reject the update @@ -737,14 +840,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it."); } } - + private synchronized UDDIReplicationPortType getReplicationClient(String node) { if (cache.containsKey(node)) { return cache.get(node); } UDDIService svc = new UDDIService(); UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort(); - + EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = em.getTransaction(); try { @@ -753,7 +856,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep sql.toString(); Query qry = em.createQuery(sql.toString()); qry.setMaxResults(1); - + org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult(); for (Operator o : resultList.getOperator()) { if (o.getOperatorNodeID().equalsIgnoreCase(node)) { @@ -763,7 +866,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } tx.rollback(); - + } catch (Exception ex) { logger.fatal("Node not found!" + node, ex); } finally { @@ -774,7 +877,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } //em.close(); return null; - + } private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>(); @@ -788,11 +891,11 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep long startTime = System.currentTimeMillis(); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime); - + return node; - + } - + @SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE) @WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body") // @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords") @@ -805,7 +908,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen(); BigInteger responseLimitCount = body.getResponseLimitCount(); HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector(); - + new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx); //TODO should we validate that "requestingNode" is in the replication config? @@ -838,7 +941,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep tx.begin(); Long firstrecord = 0L; Long lastrecord = null; - + if (changesAlreadySeen != null) { //this is basically a lower limit (i.e. the newest record that was processed by the requestor //therefore we want the oldest record stored locally to return to the requestor for processing @@ -857,7 +960,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } } - + logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords); Query createQuery = null; /* @@ -887,7 +990,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep createQuery.setMaxResults(maxrecords); createQuery.setParameter("inbound", firstrecord); createQuery.setParameter("node", node); - + List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); logger.info(records.size() + " CR records returned from query"); for (int i = 0; i < records.size(); i++) { @@ -895,18 +998,18 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (!Excluded(changesAlreadySeen, r)) { ret.add(r); } - + } - + tx.rollback(); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS, QueryStatus.SUCCESS, procTime); - + } catch (Exception ex) { logger.fatal("Error, this node is: " + node, ex); throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage())); - + } finally { if (tx.isActive()) { tx.rollback(); @@ -934,12 +1037,12 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep public List<ChangeRecordIDType> getHighWaterMarks() throws DispositionReportFaultMessage { long startTime = System.currentTimeMillis(); - + List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>(); //fetch from database the highest known watermark ReplicationConfiguration FetchEdges = FetchEdges(); - + EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = em.getTransaction(); HashMap<String, Long> map = new HashMap<String, Long>(); @@ -962,7 +1065,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //per the spec } map.put(nextNode, id); - + } } } @@ -977,21 +1080,21 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep x.setNodeID(node); x.setOriginatingUSN(id); ret.add(x); - + tx.rollback(); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime); - + } catch (Exception drfm) { throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage())); - + } finally { if (tx.isActive()) { tx.rollback(); } em.close(); } - + Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, Long> next = iterator.next(); @@ -1015,7 +1118,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //some other node just told us there's new records available, call //getChangeRecords from the remote node asynch new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx); - + logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size()); if (!queue.contains(body.getNotifyingNode())) { queue.add(body.getNotifyingNode()); @@ -1069,9 +1172,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep if (!ok) { throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode")); } - + new ValidateReplication(null).validateTransfer(em, body); - + TransferEntities te = new TransferEntities(); te.setKeyBag(body.getKeyBag()); te.setTransferToken(body.getTransferToken()); @@ -1081,7 +1184,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep //discard the token logger.debug("request validated, processing transfer"); List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID()); - + for (ChangeRecord c : executeTransfer) { try { c.setChangeID(new ChangeRecordIDType()); @@ -1146,5 +1249,5 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } } - + } http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java b/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java index 7e1b35b..a91171d 100644 --- a/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java +++ b/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java @@ -40,6 +40,7 @@ public abstract class UddiEntity implements Comparable<UddiEntity>{ protected Date modifiedIncludingChildren; protected String nodeId; protected String authorizedName; + protected boolean xfer = false; @Id @Column(name = "entity_key", nullable = false, length = 255) @@ -116,4 +117,13 @@ public abstract class UddiEntity implements Comparable<UddiEntity>{ else return 0; } + public void setIsTransferInProgress(boolean b) { + xfer = b; + } + @Column(name="xfer", nullable=false) + public boolean getIsTransferInProgress() + { + return xfer; + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
