http://git-wip-us.apache.org/repos/asf/juddi/blob/8b95902b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java 
b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
index 32fe3f8..5f97703 100644
--- 
a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
+++ 
b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
@@ -17,38 +17,48 @@
 package org.apache.juddi.api.impl;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import javax.persistence.EntityManager;
 import javax.persistence.EntityTransaction;
+import javax.persistence.Query;
+import javax.xml.datatype.DatatypeConfigurationException;
 import javax.xml.ws.BindingProvider;
 import org.apache.commons.configuration.ConfigurationException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.juddi.api.util.PublicationQuery;
+import static org.apache.juddi.api.impl.AuthenticatedService.logger;
 import org.apache.juddi.api.util.QueryStatus;
 import org.apache.juddi.api.util.ReplicationQuery;
 import org.apache.juddi.config.AppConfig;
 import org.apache.juddi.config.PersistenceManager;
 import org.apache.juddi.config.Property;
 import org.apache.juddi.mapping.MappingApiToModel;
+import org.apache.juddi.mapping.MappingModelToApi;
+import org.apache.juddi.model.BusinessEntity;
+import org.apache.juddi.model.BusinessService;
 import org.apache.juddi.model.Node;
-import org.apache.juddi.model.UddiEntityPublisher;
+import org.apache.juddi.model.Tmodel;
+import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
 import org.apache.juddi.v3.client.UDDIService;
 import org.apache.juddi.v3.error.ErrorMessage;
 import org.apache.juddi.v3.error.FatalErrorException;
-import org.apache.juddi.validation.ValidatePublish;
 import org.apache.juddi.validation.ValidateReplication;
-import org.uddi.api_v3.DispositionReport;
-import org.uddi.api_v3.Result;
+import org.uddi.api_v3.OperationalInfo;
 import org.uddi.custody_v3.DiscardTransferToken;
 import org.uddi.repl_v3.ChangeRecord;
 import org.uddi.repl_v3.ChangeRecordIDType;
 import org.uddi.repl_v3.DoPing;
 import org.uddi.repl_v3.HighWaterMarkVectorType;
 import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
+import org.uddi.repl_v3.ReplicationConfiguration;
 import org.uddi.repl_v3.TransferCustody;
 import org.uddi.v3_service.DispositionReportFaultMessage;
 import org.uddi.v3_service.UDDIReplicationPortType;
@@ -69,15 +79,271 @@ import org.uddi.v3_service.UDDIReplicationPortType;
  */
 public class UDDIReplicationImpl extends AuthenticatedService implements 
UDDIReplicationPortType {
 
-        private static Log log = LogFactory.getLog(UDDIReplicationImpl.class);
         private UDDIServiceCounter serviceCounter;
 
+        private static PullTimerTask timer = null;
+        private long startBuffer = 
20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER,
 20000l); // 20s startup delay default 
+        private long interval = 300000l;// 
AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 
300000l); //5 min default
+
+        private static UDDIPublicationImpl pub = null;
+
         public UDDIReplicationImpl() {
                 super();
+                if (pub == null) {
+                        pub = new UDDIPublicationImpl();
+                }
                 serviceCounter = 
ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
+                Init();
+                try {
+                        startBuffer = 
AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 
20000l); // 20s startup delay default 
+                        interval = 
AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 
300000l); //5 min default
+                } catch (ConfigurationException ex) {
+                        logger.fatal(ex);
+                }
 
         }
 
+        private synchronized void Init() {
+                if (queue == null) {
+                        queue = new 
ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
+                }
+                timer = new PullTimerTask();
+
+        }
+
+        private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, 
ChangeRecord r) {
+                if (changesAlreadySeen != null) {
+                        for (int i = 0; i < 
changesAlreadySeen.getHighWaterMark().size(); i++) {
+                                if 
(changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
+                                     && 
changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN()))
 {
+                                        return true;
+                                }
+                        }
+                }
+                return false;
+        }
+
+        private class PullTimerTask extends TimerTask {
+
+                private Timer timer = null;
+
+                public PullTimerTask() {
+                        super();
+                        timer = new Timer(true);
+                        timer.scheduleAtFixedRate(this, startBuffer, interval);
+                }
+
+                @Override
+                public void run() {
+
+                        //ok someone told me there's a change available
+                        while (!queue.isEmpty()) {
+                                NotifyChangeRecordsAvailable poll = 
queue.poll();
+                                if (poll != null) {
+                                        UDDIReplicationPortType 
replicationClient = getReplicationClient(poll.getNotifyingNode());
+                                        try {
+                                                //ok now get all the changes
+                                                List<ChangeRecord> records
+                                                     = 
replicationClient.getChangeRecords(node,
+                                                          null, null, 
poll.getChangesAvailable());
+                                                //ok now we need to persist 
the change records
+                                                for (int i = 0; i < 
records.size(); i++) {
+                                                        
PersistChangeRecord(records.get(i));
+                                                }
+                                        } catch (Exception ex) {
+                                                logger.equals(ex);
+                                        }
+                                }
+                        }
+                }
+
+                @Override
+                public boolean cancel() {
+                        timer.cancel();
+                        return super.cancel();
+                }
+
+                /**
+                 * someone told me there's a change available, we retrieved it
+                 * and are processing the changes locally
+                 *
+                 * @param rec
+                 */
+                private void PersistChangeRecord(ChangeRecord rec) {
+                        if (rec == null) {
+                                return;
+                        }
+                        EntityManager em = 
PersistenceManager.getEntityManager();
+                        EntityTransaction tx = em.getTransaction();
+                        /**
+                         * In nodes that support pre-bundled replication
+                         * responses, the recipient of the get_changeRecords
+                         * message MAY return more change records than 
requested
+                         * by the caller. In this scenario, the caller MUST 
also
+                         * be prepared to deal with such redundant changes 
where
+                         * a USN is less than the USN specified in the
+                         * changesAlreadySeen highWaterMarkVector.
+                         */
+                        try {
+                                tx.begin();
+                                //the change record rec must also be 
persisted!!
+                                
em.persist(MappingApiToModel.mapChangeRecord(rec));
+                                //<editor-fold defaultstate="collapsed" 
desc="delete a record">
+
+                                if (rec.getChangeRecordDelete() != null) {
+                                        if (rec.getChangeRecordDelete() != 
null && rec.getChangeRecordDelete().getBindingKey() != null && 
!"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
+                                                //delete a binding template
+                                                
pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != 
null && rec.getChangeRecordDelete().getBusinessKey() != null && 
!"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
+                                                //delete a business 
+                                                
pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != 
null && rec.getChangeRecordDelete().getServiceKey() != null && 
!"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
+                                                //delete a service 
+                                                
pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != 
null && rec.getChangeRecordDelete().getTModelKey() != null && 
!"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
+                                                //delete a tmodel 
+                                                /**
+                                                 * The changeRecordDelete for a
+                                                 * tModel does not correspond 
to
+                                                 * any API described in this
+                                                 * specification and should 
only
+                                                 * appear in the replication
+                                                 * stream as the result of an
+                                                 * administrative function to
+                                                 * permanently remove a tModel.
+                                                 */
+                                                
pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
+                                        }
+                                }
+                                if (rec.getChangeRecordDeleteAssertion() != 
null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
+                                        //delete a pa template                 
           
+                                        
pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(),
 em);
+                                }
+
+//</editor-fold>
+                                //<editor-fold defaultstate="collapsed" 
desc="New Data">
+                                if (rec.getChangeRecordNewData() != null) {
+
+                                        if 
(rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
+                                                throw new Exception("Inbound 
replication data is missiong node id!");
+                                        }
+
+                                        //The operationalInfo element MUST 
contain the operational information associated with the indicated new data.
+                                        if 
(rec.getChangeRecordNewData().getOperationalInfo() == null) {
+                                                logger.warn("Inbound 
replication data does not have the required OperationalInfo element and is NOT 
spec compliant. Data will be ignored");
+                                        } else {
+                                                if 
(rec.getChangeRecordNewData().getBindingTemplate() != null) {
+                                                        //fetch the binding 
template if it exists already
+                                                        //if it exists, 
+                                                        //      confirm the 
owning node, it shouldn't be the local node id, if it is, throw
+                                                        //      the owning 
node should be the same as it was before
+
+                                                        BusinessService model 
= em.find(org.apache.juddi.model.BusinessService.class, 
rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
+                                                        if (model == null) {
+                                                                
logger.error("Replication error, attempting to insert a binding where the 
service doesn't exist yet");
+                                                        } else {
+                                                                
ValidateNodeIdMatches(model.getNodeId(), 
rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                                
org.apache.juddi.model.BindingTemplate modelT = new 
org.apache.juddi.model.BindingTemplate();
+                                                                
MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(),
 modelT, model);
+                                                                
MappingApiToModel.mapOperationalInfo(model, 
rec.getChangeRecordNewData().getOperationalInfo());
+                                                                
em.persist(model);
+                                                        }
+
+                                                } else if 
(rec.getChangeRecordNewData().getBusinessEntity() != null) {
+
+                                                        BusinessEntity model = 
em.find(org.apache.juddi.model.BusinessEntity.class, 
rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
+                                                        if (model == null) {
+                                                                model = new 
BusinessEntity();
+                                                        } else {
+                                                                
ValidateNodeIdMatches(model.getNodeId(), 
rec.getChangeRecordNewData().getOperationalInfo());
+                                                        }
+                                                        
MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(),
 model);
+                                                        
MappingApiToModel.mapOperationalInfo(model, 
rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                        em.persist(model);
+
+                                                }
+                                                if 
(rec.getChangeRecordNewData().getBusinessService() != null) {
+                                                        BusinessEntity find = 
em.find(org.apache.juddi.model.BusinessEntity.class, 
rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
+                                                        if (find == null) {
+                                                                
logger.error("Replication error, attempting to insert a service where the 
business doesn't exist yet");
+                                                        } else {
+                                                                
org.apache.juddi.model.BusinessService model = new 
org.apache.juddi.model.BusinessService();
+                                                                
MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(),
 model, find);
+                                                                
MappingApiToModel.mapOperationalInfo(model, 
rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                                
em.persist(model);
+                                                        }
+
+                                                } else if 
(rec.getChangeRecordNewData().getTModel() != null) {
+                                                        Tmodel model = new 
Tmodel();
+                                                        
MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+
+                                                        
MappingApiToModel.mapOperationalInfo(model, 
rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                        em.persist(model);
+                                                }
+
+                                        }
+
+                                }
+//</editor-fold>
+
+                                // changeRecordNull no action needed
+                                // changeRecordHide tmodel only
+                                //<editor-fold defaultstate="collapsed" 
desc="hide tmodel">
+                                if (rec.getChangeRecordHide() != null) {
+                                        /*
+                                         A changeRecordHide element 
corresponds to the behavior of hiding a tModel described in the delete_tModel 
in the Publish API section of this Specification.  A tModel listed in a 
changeRecordHide should be marked as hidden, so that it is not returned in 
response to a find_tModel API call.
+                                        
+                                         The changeRecordHide MUST contain a 
modified timestamp to allow multi-node registries to calculate consistent 
modifiedIncludingChildren timestamps as described in Section 3.8 
operationalInfo Structure.
+                                         */
+                                        String key = 
rec.getChangeRecordHide().getTModelKey();
+                                        org.apache.juddi.model.Tmodel existing 
= em.find(org.apache.juddi.model.Tmodel.class, key);
+                                        if (existing == null) {
+                                                logger.error("Unexpected 
delete/hide tmodel message received for non existing key " + key);
+                                        } else {
+                                                existing.setDeleted(true);
+                                                
existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+                                                
existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+                                                em.persist(existing);
+                                        }
+                                }
+//</editor-fold>
+
+                                //<editor-fold defaultstate="collapsed" 
desc="changeRecordPublisherAssertion">
+                                if (rec.getChangeRecordPublisherAssertion() != 
null) {
+//TODO implement
+                                }
+//</editor-fold>
+
+                                tx.commit();
+
+                        } catch (Exception drfm) {
+                                logger.warn(drfm);
+                        } finally {
+                                if (tx.isActive()) {
+                                        tx.rollback();
+                                }
+                                em.close();
+                        }
+                }
+
+        }
+
+        private static void ValidateNodeIdMatches(String nodeId, 
OperationalInfo operationalInfo) throws Exception {
+                if (nodeId == null || operationalInfo == null) {
+                        throw new Exception("either the local node ID is null 
or the inbound replication data's node id is null");
+                }
+                if (!nodeId.equals(operationalInfo.getNodeID())) {
+                        throw new Exception("node id mismatch!");
+                }
+        }
+
         private synchronized UDDIReplicationPortType 
getReplicationClient(String node) {
                 if (cache.containsKey(node)) {
                         return cache.get(node);
@@ -92,7 +358,7 @@ public class UDDIReplicationImpl extends 
AuthenticatedService implements UDDIRep
                         cache.put(node, replicationClient);
                         return replicationClient;
                 } catch (Exception ex) {
-                        log.fatal("Node not found!" + node, ex);
+                        logger.fatal("Node not found!" + node, ex);
                 } finally {
                         if (tx.isActive()) {
                                 tx.rollback();
@@ -115,82 +381,229 @@ public class UDDIReplicationImpl extends 
AuthenticatedService implements UDDIRep
                 long startTime = System.currentTimeMillis();
                 long procTime = System.currentTimeMillis() - startTime;
                 serviceCounter.update(ReplicationQuery.DO_PING, 
QueryStatus.SUCCESS, procTime);
-                try {
-                        return 
AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ROOT_BUSINESS);
-                } catch (ConfigurationException ex) {
-                        log.fatal("Unable to load configuration!", ex);
-                }
-                throw new FatalErrorException(new 
ErrorMessage("errors.configuration.Retrieval"));
+
+                return node;
+
         }
 
+        @Override
         public List<ChangeRecord> getChangeRecords(String requestingNode,
-                HighWaterMarkVectorType changesAlreadySeen,
-                BigInteger responseLimitCount,
-                HighWaterMarkVectorType responseLimitVector)
-                throws DispositionReportFaultMessage {
+             HighWaterMarkVectorType changesAlreadySeen,
+             BigInteger responseLimitCount,
+             HighWaterMarkVectorType responseLimitVector)
+             throws DispositionReportFaultMessage {
                 long startTime = System.currentTimeMillis();
-                long procTime = System.currentTimeMillis() - startTime;
-                serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
-                        QueryStatus.SUCCESS, procTime);
 
-                //TODO fetch all records that have changed since 
changesAlreadySeen
-                ChangeRecord r = new ChangeRecord();
-                
-                ValidateReplication.unsupportedAPICall();
-                return null;
+                new 
ValidateReplication(null).validateGetChangeRecords(requestingNode, 
changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
+
+                //TODO should we validate that "requestingNode" is in the 
replication config?
+                List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
+                EntityManager em = PersistenceManager.getEntityManager();
+                EntityTransaction tx = em.getTransaction();
+
+                /**
+                 * More specifically, the recipient determines the particular
+                 * change records that are returned by comparing the 
originating
+                 * USNs in the caller’s high water mark vector with the
+                 * originating USNs of each of the changes the recipient has
+                 * seen from others or generated by itself. The recipient 
SHOULD
+                 * only return change records that have originating USNs that
+                 * are greater than those listed in the changesAlreadySeen
+                 * highWaterMarkVector and less than the limit required by
+                 * either the responseLimitCount or the responseLimitVector.
+                 *
+                 *
+                 * Part of the message is a high water mark vector that 
contains
+                 * for each node of the registry the originating USN of the 
most
+                 * recent change record that has been successfully processed by
+                 * the invocating node
+                 */
+                int maxrecords = 100;
+                if (responseLimitCount != null) {
+                        maxrecords = responseLimitCount.intValue();
+                }
+                try {
+                        tx.begin();
+                        Long firstrecord = 1L;
+                        Long lastrecord = null;
+
+                        if (changesAlreadySeen != null) {
+                                //this is basically a lower limit (i.e. the 
newest record that was processed by the requestor
+                                //therefore we want the oldest record stored 
locally to return to the requestor for processing
+                                for (int i = 0; i < 
changesAlreadySeen.getHighWaterMark().size(); i++) {
+                                        if 
(responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
+                                                firstrecord = 
changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1;
+                                        }
+                                }
+                        }
+                        if (responseLimitVector != null) {
+                                //using responseLimitVector, indicating for 
each node in the graph the first change originating there that he does not wish 
to be returned.
+                                //upper limit basically
+                                for (int i = 0; i < 
responseLimitVector.getHighWaterMark().size(); i++) {
+                                        if 
(responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
+                                                lastrecord = 
responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
+                                        }
+                                }
+                        }
+
+                        Query createQuery = null;
+                        if (lastrecord != null) {
+                                createQuery = em.createQuery("select e from 
ChangeRecord e where (e.id > :inbound and e.nodeID = :node and e.id < 
:lastrecord) OR (e.originatingUSN > :inbound and e.nodeID != :node and 
e.originatingUSN < :lastrecord) order by e.id ASC");
+                                createQuery.setParameter("lastrecord", 
lastrecord);
+                        } else {
+                                createQuery = em.createQuery("select e from 
ChangeRecord e where (e.id > :inbound and e.nodeID = :node) OR 
(e.originatingUSN > :inbound and e.nodeID != :node) order by e.id ASC");
+                        }
+                        createQuery.setMaxResults(maxrecords);
+                        createQuery.setParameter("inbound", firstrecord);
+                        createQuery.setParameter("node", node);
+
+                        List<org.apache.juddi.model.ChangeRecord> records = 
(List<org.apache.juddi.model.ChangeRecord>) 
(org.apache.juddi.model.ChangeRecord) createQuery.getResultList();
+                        for (int i = 0; i < records.size(); i++) {
+                                ChangeRecord r = 
MappingModelToApi.mapChangeRecord(records.get(i));
+                                if (!Excluded(changesAlreadySeen, r)) {
+                                        ret.add(r);
+                                }
+                                
+                        }
+                        
+
+                        tx.rollback();
+                        long procTime = System.currentTimeMillis() - startTime;
+                        
serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
+                             QueryStatus.SUCCESS, procTime);
+
+                } catch (Exception ex) {
+                        logger.fatal("Error, this node is: " + node, ex);
+                        throw new FatalErrorException(new 
ErrorMessage("E_fatalError", ex.getMessage()));
+
+                } finally {
+                        if (tx.isActive()) {
+                                tx.rollback();
+                        }
+                        em.close();
+                }
+                return ret;
         }
 
+        /**
+         * This UDDI API message provides a means to obtain a list of
+         * highWaterMark element containing the highest known USN for all nodes
+         * in the replication graph. If there is no graph, we just return the
+         * local bits
+         *
+         * @return
+         * @throws DispositionReportFaultMessage
+         */
+        @Override
         public List<ChangeRecordIDType> getHighWaterMarks()
-                throws DispositionReportFaultMessage {
+             throws DispositionReportFaultMessage {
                 long startTime = System.currentTimeMillis();
-                long procTime = System.currentTimeMillis() - startTime;
-                serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, 
QueryStatus.SUCCESS, procTime);
+
+                List<ChangeRecordIDType> ret = new 
ArrayList<ChangeRecordIDType>();
 
                 //fetch from database the highest known watermark
-                ValidateReplication.unsupportedAPICall();
-                return null;
+                ReplicationConfiguration FetchEdges = FetchEdges();
+
+                EntityManager em = PersistenceManager.getEntityManager();
+                EntityTransaction tx = em.getTransaction();
+                try {
+                        tx.begin();
+                        if (FetchEdges != null) {
+                                Iterator<String> it = 
FetchEdges.getCommunicationGraph().getNode().iterator();
+                                while (it.hasNext()) {
+                                        String nextNode = it.next();
+                                        if (!nextNode.equals(node)) {
+
+                                                Long id = (Long) 
em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = 
:node order by e.originatingUSN desc").setParameter("node", 
nextNode).setMaxResults(1).getSingleResult();
+                                                if (id == null) {
+                                                        id = 0L;
+                                                        //per the spec
+                                                }
+                                                ChangeRecordIDType x = new 
ChangeRecordIDType(nextNode, id);
+                                                ret.add(x);
+                                        }
+                                }
+                        }
+                        //dont forget this node
+                        Long id = (Long) em.createQuery("select (e.id) from 
ChangeRecord e where e.nodeID = :node  order by e.id 
desc").setParameter("node", node).setMaxResults(1).getSingleResult();
+                        if (id == null) {
+                                id = 0L;
+                        }
+                        ChangeRecordIDType x = new ChangeRecordIDType();
+                        x.setNodeID(node);
+                        x.setOriginatingUSN(id);
+                        ret.add(x);
+
+                        tx.rollback();
+                        long procTime = System.currentTimeMillis() - startTime;
+                        
serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, 
procTime);
+
+                } catch (Exception drfm) {
+                        throw new FatalErrorException(new 
ErrorMessage("E_fatalError", drfm.getMessage()));
+
+                } finally {
+                        if (tx.isActive()) {
+                                tx.rollback();
+                        }
+                        em.close();
+                }
+
+                return ret;
         }
 
+        /**
+         * this means that another node has a change and we need to pick up the
+         * change and apply it to our local database.
+         *
+         * @param body
+         * @throws DispositionReportFaultMessage
+         */
+        @Override
         public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable 
body)
-                throws DispositionReportFaultMessage {
+             throws DispositionReportFaultMessage {
                 long startTime = System.currentTimeMillis();
                 long procTime = System.currentTimeMillis() - startTime;
                 
serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
-                        QueryStatus.SUCCESS, procTime);
+                     QueryStatus.SUCCESS, procTime);
                 //some other node just told us there's new records available, 
call
                 //getChangeRecords from the remote node asynch
 
-                ValidateReplication.unsupportedAPICall();
+                new 
ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
+
+                queue.add(body);
+
+                //ValidateReplication.unsupportedAPICall();
         }
+        private static Queue<NotifyChangeRecordsAvailable> queue = null;
 
+        /**
+         * transfers custody of an entity from node1/user1 to node2/user2
+         *
+         * @param body
+         * @throws DispositionReportFaultMessage
+         */
         public void transferCustody(TransferCustody body)
-                throws DispositionReportFaultMessage {
+             throws DispositionReportFaultMessage {
                 long startTime = System.currentTimeMillis();
 
-
                 //*this node is transfering data to another node
                 //body.getTransferOperationalInfo().
-
-
                 ValidateReplication.unsupportedAPICall();
 
                 EntityManager em = PersistenceManager.getEntityManager();
                 //EntityTransaction tx = em.getTransaction();
 
-
                 //The custodial node must verify that it has granted 
permission to transfer the entities identified and that this permission is 
still valid.  This operation is comprised of two steps:
                 //1.       Verification that the transferToken was issued by 
it, that it has not expired, that it represents the authority to transfer no 
more and no less than those entities identified by the businessKey and 
tModelKey elements and that all these entities are still valid and not yet 
transferred. The transferToken is invalidated if any of these conditions are 
not met.
                 //2.       If the conditions above are met, the custodial node 
will prevent any further changes to the entities identified by the businessKey 
and tModelKey elements identified. The entity will remain in this state until 
the replication stream indicates it has been successfully processed via the 
replication stream. 
-
                 //Upon successful verification of the custody transfer request 
by the custodial node, an empty message is returned by it indicating the 
success of the request and acknowledging the custody transfer. 
-
                 //Following the issue of the empty message, the custodial node 
will submit into the replication stream a changeRecordNewData providing in the 
operationalInfo, the nodeID accepting custody of the datum and the 
authorizedName of the publisher accepting ownership. The 
acknowledgmentRequested attribute of this change record MUST be set to "true".
                 //TODO enqueue Replication message
-
                 //Finally, the custodial node invalidates the transferToken in 
order to prevent additional calls of the transfer_entities API.
                 DiscardTransferToken dtt = new DiscardTransferToken();
                 dtt.setKeyBag(body.getKeyBag());
                 dtt.setTransferToken(body.getTransferToken());
                 new UDDICustodyTransferImpl().discardTransferToken(dtt);
         }
+
 }

http://git-wip-us.apache.org/repos/asf/juddi/blob/8b95902b/juddi-core/src/main/java/org/apache/juddi/config/Install.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/config/Install.java 
b/juddi-core/src/main/java/org/apache/juddi/config/Install.java
index cd83040..63cbf97 100644
--- a/juddi-core/src/main/java/org/apache/juddi/config/Install.java
+++ b/juddi-core/src/main/java/org/apache/juddi/config/Install.java
@@ -46,9 +46,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.juddi.ClassUtil;
 import org.apache.juddi.api.impl.UDDIInquiryImpl;
+import org.apache.juddi.api.impl.UDDIPublicationImpl;
 import org.apache.juddi.keygen.KeyGenerator;
 import org.apache.juddi.mapping.MappingApiToModel;
 import org.apache.juddi.model.UddiEntityPublisher;
+import org.apache.juddi.replication.ReplicationNotifier;
 import org.apache.juddi.v3.error.ErrorMessage;
 import org.apache.juddi.v3.error.FatalErrorException;
 import org.apache.juddi.v3.error.InvalidKeyPassedException;
@@ -56,6 +58,7 @@ import org.apache.juddi.v3.error.KeyUnavailableException;
 import org.apache.juddi.v3.error.ValueNotAllowedException;
 import org.apache.juddi.validation.ValidatePublish;
 import org.apache.juddi.validation.ValidateUDDIKey;
+import org.uddi.api_v3.SaveBusiness;
 import org.uddi.api_v3.SaveTModel;
 import org.uddi.api_v3.TModel;
 import org.uddi.v3_service.DispositionReportFaultMessage;
@@ -288,7 +291,7 @@ public class Install {
 
                 for (org.apache.juddi.model.BusinessService service : 
modelBusinessEntity.getBusinessServices()) {
                         
service.setAuthorizedName(rootPublisher.getAuthorizedName());
-                        service.setNodeId(nodeId);
+                        service.setNodeId(modelBusinessEntity.getNodeId());
                         service.setCreated(now);
                         service.setModified(now);
                         service.setModifiedIncludingChildren(now);
@@ -310,6 +313,9 @@ public class Install {
                 }
 
                 em.persist(modelBusinessEntity);
+                SaveBusiness sb = new SaveBusiness();
+                sb.getBusinessEntity().add(rootBusinessEntity);
+                
ReplicationNotifier.Enqueue(UDDIPublicationImpl.getChangeRecord(modelBusinessEntity,
 rootBusinessEntity, modelBusinessEntity.getNodeId()));
 
                 return modelBusinessEntity.getEntityKey();
 
@@ -485,6 +491,10 @@ public class Install {
                                         modelTModel.setNodeId(nodeId);
 
                                         em.persist(modelTModel);
+                                        
+                                        SaveTModel stm = new SaveTModel();
+                                        stm.getTModel().add(apiTModel);
+                                        
ReplicationNotifier.Enqueue(UDDIPublicationImpl.getChangeRecord(modelTModel, 
apiTModel, nodeId));
                                 }
 
                         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to