JUDDI-910 directed graph replication is now implemented, 3 instances of tomcat are now generates with juddi-tomcat build JUDDI-923 change record browser added, soap methods added for obtaining only failed (to apply changes) records
Project: http://git-wip-us.apache.org/repos/asf/juddi/repo Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/c0354a44 Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/c0354a44 Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/c0354a44 Branch: refs/heads/master Commit: c0354a44fd16fb42ea8c33d0ffabe86f78e73e54 Parents: 2a24372 Author: Alex <[email protected]> Authored: Thu Jan 29 10:17:59 2015 -0500 Committer: Alex <[email protected]> Committed: Thu Jan 29 10:17:59 2015 -0500 ---------------------------------------------------------------------- juddi-core/pom.xml | 345 +-- .../main/java/org/apache/juddi/Registry.java | 2 +- .../org/apache/juddi/api/impl/JUDDIApiImpl.java | 109 +- .../juddi/api/impl/UDDIPublicationImpl.java | 6 +- .../juddi/api/impl/UDDIReplicationImpl.java | 263 +- .../org/apache/juddi/api/util/JUDDIQuery.java | 10 +- .../apache/juddi/mapping/MappingApiToModel.java | 11 +- .../org/apache/juddi/model/ChangeRecord.java | 20 +- .../juddi/replication/ReplicationNotifier.java | 135 +- .../org/apache/juddi/rmi/JUDDIApiService.java | 7 + .../juddi/api/impl/API_160_ReplicationTest.java | 2 + .../apache/juddi/api/runtime/juddiTestimpl.java | 8 + juddi-examples/more-uddi-samples/pom.xml | 210 +- .../samples/CompareByTModelInstanceInfoQOS.java | 2 +- .../org/apache/juddi/samples/EntryPoint.java | 465 +--- .../juddi/samples/EntryPointSingleNode.java | 471 ++++ .../juddi/samples/EntryPoitMultiNode.java | 81 + .../apache/juddi/samples/JuddiAdminService.java | 315 ++- .../apache/juddi/samples/UddiCreatebulk.java | 17 +- .../samples/UddiDigitalSignatureBusiness.java | 12 + .../apache/juddi/samples/UddiReplication.java | 39 +- .../org/apache/juddi/samples/UddiSubscribe.java | 22 +- .../samples/UddiSubscribeAssertionStatus.java | 22 +- .../juddi/samples/UddiSubscribeValidate.java | 14 + .../samples/UddiSubscriptionManagement.java | 13 + .../org/apache/juddi/samples/WadlImport.java | 14 +- .../org/apache/juddi/samples/WsdlImport.java | 12 +- .../resources/META-INF/simple-publish-uddi.xml | 26 + juddi-tomcat/build.xml | 70 + .../root_BusinessEntity.xml | 28 +- .../juddi_install_data_node3/UDDI_Publisher.xml | 22 + .../juddi_install_data_node3/UDDI_tModels.xml | 2362 ++++++++++++++++++ .../root_BusinessEntity.xml | 668 +++++ .../juddi_install_data_node3/root_Publisher.xml | 21 + .../root_tModelKeyGen.xml | 40 + juddi-tomcat/juddiv3Node2.xml | 2 +- juddi-tomcat/juddiv3Node3.xml | 230 ++ juddi-tomcat/pom.xml | 12 +- juddi-tomcat/serverNode2.xml | 2 +- juddi-tomcat/serverNode3.xml | 145 ++ juddi-tomcat/uddiNode3.xml | 151 ++ .../juddi/adminconsole/hub/UddiAdminHub.java | 60 +- .../juddi/adminconsole/resources/web.properties | 2 + .../adminconsole/resources/web_es.properties | 2 + juddiv3-war/src/main/webapp/admin/changes.jsp | 186 ++ juddiv3-war/src/main/webapp/admin/csrf.jsp | 15 +- juddiv3-war/src/main/webapp/admin/home.jsp | 18 + .../joepublisher/businessEntitySigned.xml | 88 + uddi-tck/pref-rpt-1422229264251.txt | 12 + .../tck/JUDDI_300_MultiNodeIntegrationTest.java | 10 +- .../v3/tck/UDDI_090_RMIIntegrationTest.java | 46 +- uddi-ws/pom.xml.orig | 85 + ...dReplicationChangeRecordsMessageRequest.java | 112 + ...ReplicationChangeRecordsMessageResponse.java | 77 + .../org/apache/juddi/api_v3/ObjectFactory.java | 154 +- .../juddi/v3_service/JUDDIApiPortType.java | 39 +- uddi-ws/src/main/resources/juddi_api_v1.wsdl | 50 + 57 files changed, 6313 insertions(+), 1049 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/pom.xml ---------------------------------------------------------------------- diff --git a/juddi-core/pom.xml b/juddi-core/pom.xml index 335ecd4..48da9e5 100644 --- a/juddi-core/pom.xml +++ b/juddi-core/pom.xml @@ -1,123 +1,123 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- * Copyright 2001-2009 The Apache Software Foundation. * * Licensed under - the Apache License, Version 2.0 (the "License"); * you may not use this file - except in compliance with the License. * You may obtain a copy of the License - at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by - applicable law or agreed to in writing, software * distributed under the - License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. * See the License for the specific - language governing permissions and * limitations under the License. * */ --> +the Apache License, Version 2.0 (the "License"); * you may not use this file +except in compliance with the License. * You may obtain a copy of the License +at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by +applicable law or agreed to in writing, software * distributed under the +License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. * See the License for the specific +language governing permissions and * limitations under the License. * */ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.juddi</groupId> - <artifactId>juddi-parent</artifactId> - <version>3.3.0-SNAPSHOT</version> - </parent> - <artifactId>juddi-core</artifactId> - <packaging>bundle</packaging> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.juddi</groupId> + <artifactId>juddi-parent</artifactId> + <version>3.3.0-SNAPSHOT</version> + </parent> + <artifactId>juddi-core</artifactId> + <packaging>bundle</packaging> - <name>jUDDI Core Services</name> - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>1.4.0</version> - <configuration> - <instructions> - <Export-Package>org.apache.juddi, org.apache.juddi.api, - org.apache.juddi.api.impl, - org.apache.juddi.api.util, - org.apache.juddi.config, org.apache.juddi.config, - org.apache.juddi.cryptor, org.apache.juddi.keygen, - org.apache.juddi.mapping, org.apache.juddi.model, - org.apache.juddi.query, org.apache.juddi.query.util, - org.apache.juddi.rmi, org.apache.juddi.subscription, - org.apache.juddi.subscription.notify, - org.apache.juddi.replication, - org.apache.juddi.validation, org.apache.juddi.validation.vsv, org.apache.juddi.v3.auth, - org.apache.juddi.v3.error</Export-Package> - <Include-Resource>juddi_install_data=src/main/resources/juddi_install_data, - src/main/resources/messages.properties</Include-Resource> - </instructions> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.3</version> - <executions> - <execution> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="juddi-derby-test-db" /> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>hibernate3-maven-plugin</artifactId> - <version>2.2</version> - <!-- <executions> <execution> <phase>process-classes</phase> <goals> - <goal>hbm2ddl</goal> </goals> </execution> </executions> --> - <configuration> - <components> - <component> - <name>hbm2ddl</name> - <implementation>jpaconfiguration</implementation> - </component> - </components> - <componentProperties> - <persistenceunit>juddiDatabase</persistenceunit> - <outputfilename>schema.ddl</outputfilename> - <drop>false</drop> - <create>true</create> - <export>false</export> - <format>true</format> - </componentProperties> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.juddi</groupId> - <artifactId>uddi-ws</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.juddi</groupId> - <artifactId>juddi-client</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.juddi</groupId> - <artifactId>uddi-tck-base</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> - </dependency> - <dependency> + <name>jUDDI Core Services</name> + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>1.4.0</version> + <configuration> + <instructions> + <Export-Package>org.apache.juddi, org.apache.juddi.api, + org.apache.juddi.api.impl, + org.apache.juddi.api.util, + org.apache.juddi.config, org.apache.juddi.config, + org.apache.juddi.cryptor, org.apache.juddi.keygen, + org.apache.juddi.mapping, org.apache.juddi.model, + org.apache.juddi.query, org.apache.juddi.query.util, + org.apache.juddi.rmi, org.apache.juddi.subscription, + org.apache.juddi.subscription.notify, + org.apache.juddi.replication, + org.apache.juddi.validation, org.apache.juddi.validation.vsv, org.apache.juddi.v3.auth, + org.apache.juddi.v3.error</Export-Package> + <Include-Resource>juddi_install_data=src/main/resources/juddi_install_data, + src/main/resources/messages.properties</Include-Resource> + </instructions> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.3</version> + <executions> + <execution> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete dir="juddi-derby-test-db" /> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>hibernate3-maven-plugin</artifactId> + <version>2.2</version> + <!-- <executions> <execution> <phase>process-classes</phase> <goals> + <goal>hbm2ddl</goal> </goals> </execution> </executions> --> + <configuration> + <components> + <component> + <name>hbm2ddl</name> + <implementation>jpaconfiguration</implementation> + </component> + </components> + <componentProperties> + <persistenceunit>juddiDatabase</persistenceunit> + <outputfilename>schema.ddl</outputfilename> + <drop>false</drop> + <create>true</create> + <export>false</export> + <format>true</format> + </componentProperties> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.juddi</groupId> + <artifactId>uddi-ws</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.juddi</groupId> + <artifactId>juddi-client</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.juddi</groupId> + <artifactId>uddi-tck-base</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.9</version> </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.1</version> -</dependency> - <dependency> - <groupId>org.hibernate.javax.persistence</groupId> - <artifactId>hibernate-jpa-2.1-api</artifactId> - <version>1.0.0.Final</version> - </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.1</version> + </dependency> + <dependency> + <groupId>org.hibernate.javax.persistence</groupId> + <artifactId>hibernate-jpa-2.1-api</artifactId> + <version>1.0.0.Final</version> + </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-core</artifactId> @@ -139,64 +139,69 @@ <artifactId>cglib-nodep</artifactId> <version>2.1_3</version> </dependency> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>1.3</version> - </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.3</version> + </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jta_1.1_spec</artifactId> - <version>1.1.1</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.geronimo.javamail</groupId> - <artifactId>geronimo-javamail_1.4_mail</artifactId> - <version>1.8.3</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.5.3.0_1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>5.1.6</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>postgresql</groupId> - <artifactId>postgresql</artifactId> - <version>8.2-504.jdbc3</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockejb</groupId> - <artifactId>mockejb</artifactId> - <version>0.6-beta2</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>dumbster</groupId> - <artifactId>dumbster</artifactId> - <version>1.5</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>3.2</version> - </dependency> - <dependency> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - <version>2.4</version> - </dependency> - </dependencies> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jta_1.1_spec</artifactId> + <version>1.1.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.geronimo.javamail</groupId> + <artifactId>geronimo-javamail_1.4_mail</artifactId> + <version>1.8.3</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.5.3.0_1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.6</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>8.2-504.jdbc3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockejb</groupId> + <artifactId>mockejb</artifactId> + <version>0.6-beta2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>dumbster</groupId> + <artifactId>dumbster</artifactId> + <version>1.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>3.2</version> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + <version>2.4</version> + </dependency> + <dependency> + <groupId>org.audit4j</groupId> + <artifactId>audit4j-core</artifactId> + <version>2.2.0</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/Registry.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/Registry.java b/juddi-core/src/main/java/org/apache/juddi/Registry.java index eaaff53..4a1fff0 100644 --- a/juddi-core/src/main/java/org/apache/juddi/Registry.java +++ b/juddi-core/src/main/java/org/apache/juddi/Registry.java @@ -78,7 +78,7 @@ public class Registry { */ public synchronized static void start() throws ConfigurationException { if (registry == null) { - log.info("Starting jUDDI registry..."); + log.info("Starting jUDDI registry...This is node " + AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "")); registry = new Registry(); replicationNotifier = new ReplicationNotifier(); AppConfig.triggerReload(); http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java index 3395dc7..3654d29 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java @@ -26,8 +26,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.jws.WebMethod; +import javax.jws.WebParam; +import javax.jws.WebResult; import javax.jws.WebService; +import javax.jws.soap.SOAPBinding; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import javax.persistence.Query; @@ -57,6 +61,8 @@ import org.apache.juddi.api_v3.GetAllPublisherDetail; import org.apache.juddi.api_v3.GetClientSubscriptionInfoDetail; import org.apache.juddi.api_v3.GetEntityHistoryMessageRequest; import org.apache.juddi.api_v3.GetEntityHistoryMessageResponse; +import org.apache.juddi.api_v3.GetFailedReplicationChangeRecordsMessageRequest; +import org.apache.juddi.api_v3.GetFailedReplicationChangeRecordsMessageResponse; import org.apache.juddi.api_v3.GetPublisherDetail; import org.apache.juddi.api_v3.NodeDetail; import org.apache.juddi.api_v3.NodeList; @@ -113,7 +119,6 @@ import org.uddi.api_v3.SaveBusiness; import org.uddi.api_v3.SaveTModel; import org.uddi.api_v3.TModelInfo; import org.uddi.api_v3.TModelInfos; -import org.uddi.repl_v3.ChangeRecordDelete; import org.uddi.repl_v3.ChangeRecords; import org.uddi.repl_v3.CommunicationGraph; import org.uddi.repl_v3.Operator; @@ -409,10 +414,10 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy * tModel will no longer be able to use the tModel if jUDDI Option * Enforce referential Integrity is enabled.<br> * Required permission, you must be am administrator - * {@link Property#JUDDI_ENFORCE_REFERENTIAL_INTEGRITY}. In addition, - * tModels that are owned by another node via replication cannot be deleted using - * this method and will throw an exception - + * {@link Property#JUDDI_ENFORCE_REFERENTIAL_INTEGRITY}. In addition, + * tModels that are owned by another node via replication cannot be + * deleted using this method and will throw an exception + * * * @param body * @throws DispositionReportFaultMessage @@ -439,16 +444,18 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy if (obj == null) { throw new InvalidKeyPassedException(new ErrorMessage("errors.invalidkey.TModelNotFound", entityKey)); } - if (!obj.getNodeId().equals(node)) + if (!obj.getNodeId().equals(node)) { throw new InvalidKeyPassedException(new ErrorMessage("errors.invalidkey.TModelNodeOwner", entityKey + " this node " + node + " owning node " + obj.getNodeId())); + } em.remove(obj); - changes.add( UDDIPublicationImpl.getChangeRecord_deleteTModelDelete(entityKey, node)); - + changes.add(UDDIPublicationImpl.getChangeRecord_deleteTModelDelete(entityKey, node)); + } tx.commit(); - for (ChangeRecord cr: changes) + for (ChangeRecord cr : changes) { ReplicationNotifier.Enqueue(cr); + } long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(JUDDIQuery.ADMIN_DELETE_TMODEL, QueryStatus.SUCCESS, procTime); @@ -1187,11 +1194,11 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy } tx.commit(); - for (TemporaryMailContainer t:notifications){ - USERFRIENDLYSMTPNotifier.notifySubscriptionDeleted(t); + for (TemporaryMailContainer t : notifications) { + USERFRIENDLYSMTPNotifier.notifySubscriptionDeleted(t); } - notifications.clear(); - notifications=null; + notifications.clear(); + notifications = null; long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(JUDDIQuery.ADMIN_DELETE_SUB, QueryStatus.SUCCESS, procTime); @@ -1231,6 +1238,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy stm.getBusinessEntity().addAll(values.get(i).getBusinessEntity()); pub.saveBusiness(stm); } + //TODO replication? tx.commit(); long procTime = System.currentTimeMillis() - startTime; @@ -1274,7 +1282,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy stm.getTModel().addAll(values.get(i).getTModel()); pub.saveTModel(stm); } - + //TODO replication? tx.commit(); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(JUDDIQuery.ADMIN_SAVE_TMODEL, @@ -1361,7 +1369,10 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy } new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration, em, node); + //StringWriter sw = new StringWriter(); + //JAXB.marshal(replicationConfiguration, sw); org.apache.juddi.model.ReplicationConfiguration model = null; + logger.info(publisher.getAuthorizedName() + " is setting the replication config from " + getRequestorsIPAddress());// + " " + sw.toString()); try { model = (ReplicationConfiguration) em.createQuery("select c FROM ReplicationConfiguration c order by c.serialNumber desc").getSingleResult(); } catch (Exception ex) { @@ -1382,7 +1393,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy } else { //a config exists, remove it, add the new one //spec doesn't appear to mention if recording a change history on the config is required - //assuming not. + //assuming we'll keep it for now, might be useful later. //em.remove(model); oldConfig = new org.uddi.repl_v3.ReplicationConfiguration(); MappingModelToApi.mapReplicationConfiguration(model, oldConfig); @@ -1440,7 +1451,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy StringBuilder sql = new StringBuilder(); sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc"); - sql.toString(); + //sql.toString(); Query qry = em.createQuery(sql.toString()); qry.setMaxResults(1); @@ -1457,7 +1468,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy throw drfm; } catch (Exception ex) { //possible that there is no config to return - logger.warn("Error caught, is there a replication config is avaiable? Returning a default config (no replication): " + ex.getMessage()); + //logger.warn("Error caught, is there a replication config is avaiable? Returning a default config (no replication): " + ex.getMessage()); logger.debug("Error caught, is there a replication config is avaiable? Returning a default config (no replication): ", ex); r.setCommunicationGraph(new CommunicationGraph()); @@ -1516,7 +1527,9 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy r.setMaximumTimeToGetChanges(BigInteger.ONE); r.setMaximumTimeToSyncRegistry(BigInteger.ONE); - // JAXB.marshal(r, System.out); + //StringWriter sw = new StringWriter(); + //JAXB.marshal(r, sw); + //logger.info("dumping returned replication config " + sw.toString()); return r; } @@ -1614,4 +1627,64 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy em.close(); } } + + /** + * {@inheritDoc } + * + * @param body + * @return + * @throws DispositionReportFaultMessage + * @throws RemoteException + */ + @Override + public GetFailedReplicationChangeRecordsMessageResponse getFailedReplicationChangeRecords( + GetFailedReplicationChangeRecordsMessageRequest body) + throws DispositionReportFaultMessage, RemoteException { + //public GetFailedReplicationChangeRecordsMessageResponse getFailedReplicationChangeRecords(GetFailedReplicationChangeRecordsMessageRequest body) throws DispositionReportFaultMessage, RemoteException { + long startTime = System.currentTimeMillis(); + if (body == null) { + throw new InvalidValueException(new ErrorMessage("errors.NullInput")); + } + EntityManager em = PersistenceManager.getEntityManager(); + EntityTransaction tx = em.getTransaction(); + try { + tx.begin(); + UddiEntityPublisher requestor = this.getEntityPublisher(em, body.getAuthInfo()); + if (!((Publisher) requestor).isAdmin()) { + throw new UserMismatchException(new ErrorMessage("errors.AdminReqd")); + } + if (body.getMaxRecords() <= 0) { + body.setMaxRecords(20); + } + if (body.getOffset() < 0) { + body.setOffset(0); + } + Query createQuery = em.createQuery("select m from ChangeRecord m where m.isAppliedLocally=false order by m.id DESC "); + createQuery.setMaxResults((int) body.getMaxRecords()); + createQuery.setFirstResult((int) body.getOffset()); + List<ChangeRecord> resultList = createQuery.getResultList(); + GetFailedReplicationChangeRecordsMessageResponse res = new GetFailedReplicationChangeRecordsMessageResponse(); + res.setChangeRecords(new ChangeRecords()); + for (ChangeRecord cr : resultList) { + res.getChangeRecords().getChangeRecord().add(MappingModelToApi.mapChangeRecord(cr)); + } + + tx.rollback(); + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(JUDDIQuery.ADMIN_GET_FAILED_CRS, + QueryStatus.SUCCESS, procTime); + return res; + } catch (DispositionReportFaultMessage drfm) { + long procTime = System.currentTimeMillis() - startTime; + serviceCounter.update(JUDDIQuery.ADMIN_GET_FAILED_CRS, + QueryStatus.FAILED, procTime); + throw drfm; + + } finally { + if (tx.isActive()) { + tx.rollback(); + } + em.close(); + } + } } http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java index 0139497..275a202 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java @@ -1281,9 +1281,9 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub return cr; } - public static ChangeRecord getChangeRecord(BusinessService modelBindingTemplate, org.uddi.api_v3.BusinessService api, String node) throws DispositionReportFaultMessage { + public static ChangeRecord getChangeRecord(BusinessService model, org.uddi.api_v3.BusinessService api, String node) throws DispositionReportFaultMessage { ChangeRecord cr = new ChangeRecord(); - cr.setEntityKey(modelBindingTemplate.getEntityKey()); + cr.setEntityKey(model.getEntityKey()); cr.setNodeID(node); cr.setRecordType(ChangeRecord.RecordType.ChangeRecordNewData); @@ -1292,7 +1292,7 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub crapi.setChangeRecordNewData(new ChangeRecordNewData()); crapi.getChangeRecordNewData().setBusinessService(api); crapi.getChangeRecordNewData().setOperationalInfo(new OperationalInfo()); - MappingModelToApi.mapOperationalInfo(modelBindingTemplate, crapi.getChangeRecordNewData().getOperationalInfo()); + MappingModelToApi.mapOperationalInfo(model, crapi.getChangeRecordNewData().getOperationalInfo()); StringWriter sw = new StringWriter(); JAXB.marshal(crapi, sw); try { http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java index 9a09308..7bcdf95 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java @@ -119,14 +119,16 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep Set<String> addedNodes = diffNodeList(oldnodes, newNodes); if (queue == null) { - queue = new ConcurrentLinkedQueue<String>(); + queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>(); } for (String s : addedNodes) { if (!s.equals(node)) { logger.info("This node: " + node + ". New replication node queue for synchronization: " + s); //HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType(); //highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L)); - queue.add(s); + HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType(); + highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(s, 0L)); + queue.add(new NotifyChangeRecordsAvailable(s, highWaterMarkVectorType)); } } @@ -201,7 +203,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep private synchronized void Init() { if (queue == null) { - queue = new ConcurrentLinkedQueue<String>(); + queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>(); } timer = new PullTimerTask(); @@ -241,9 +243,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } //ok someone told me there's a change available while (!queue.isEmpty()) { - String poll = queue.poll(); - if (poll != null && !poll.equalsIgnoreCase(node)) { - UDDIReplicationPortType replicationClient = getReplicationClient(poll); + NotifyChangeRecordsAvailable poll = queue.poll(); + if (poll != null && !poll.getNotifyingNode().equalsIgnoreCase(node)) { + UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode()); if (replicationClient == null) { logger.fatal("unable to obtain a replication client to node " + poll); } else { @@ -256,31 +258,44 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep // logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() // + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN()); //} - int recordsreturned = 1; - while (recordsreturned > 0) { - GetChangeRecords body = new GetChangeRecords(); - body.setRequestingNode(node); - body.setResponseLimitCount(BigInteger.valueOf(20)); - - body.setChangesAlreadySeen(getLastChangeRecordFrom(poll)); - logger.info("fetching updates from " + poll + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + " items still in the queue: " + queue.size()); - - List<ChangeRecord> records - = replicationClient.getChangeRecords(body).getChangeRecord(); - //ok now we need to persist the change records - logger.info("Change records retrieved from " + poll + ", " + records.size()); - for (int i = 0; i < records.size(); i++) { - //logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN()); - PersistChangeRecord(records.get(i)); + Set<String> nodesHitThisCycle=new HashSet<String>(); + for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) { + int recordsreturned = 21; + while (recordsreturned >= 20) { + if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) + { + logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping"); + break; + } + if (poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID().equalsIgnoreCase(node)) { + logger.info("ignoring updates that were generated here " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN() + " sent by " + poll.getNotifyingNode() + " this node is " + node); + break; + } + nodesHitThisCycle.add(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()); + GetChangeRecords body = new GetChangeRecords(); + body.setRequestingNode(node); + body.setResponseLimitCount(BigInteger.valueOf(100L)); + + body.setChangesAlreadySeen(getLastChangeRecordFrom(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())); + logger.info("fetching updates from " + poll.getNotifyingNode() + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getNodeID() + ":" + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + ", items still in the queue: " + queue.size()); + //JAXB.marshal(body, System.out); + List<ChangeRecord> records + = replicationClient.getChangeRecords(body).getChangeRecord(); + //ok now we need to persist the change records + logger.info("Change records retrieved from " + poll.getNotifyingNode() + ", " + records.size()); + for (int i = 0; i < records.size(); i++) { + logger.info("Change records retrieved " + records.get(i).getChangeID().getNodeID() + " USN " + records.get(i).getChangeID().getOriginatingUSN()); + PersistChangeRecord(records.get(i)); + } + recordsreturned = records.size(); } - recordsreturned = records.size(); } } catch (Exception ex) { logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider) replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex); } } } else { - logger.warn("weird, popped an object from the queue but it was null."); + logger.warn("strange, popped an object from the queue but it was null or it from myself, ignoring..."); } } } @@ -293,7 +308,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep /** * someone told me there's a change available, we retrieved it - * and are processing the changes locally + * and are processing the changes locally. * * @param rec */ @@ -309,6 +324,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = em.getTransaction(); + org.apache.juddi.model.ChangeRecord mapChangeRecord = null; /** * In nodes that support pre-bundled replication * responses, the recipient of the get_changeRecords @@ -321,16 +337,33 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep try { tx.begin(); - //the change record rec must also be persisted!! - org.apache.juddi.model.ChangeRecord mapChangeRecord = MappingApiToModel.mapChangeRecord(rec); + //check to see if we have this update already + Query createQuery = em.createQuery("select c from ChangeRecord c where c.nodeID=:node and c.originatingUSN=:oid"); + createQuery.setParameter("node", rec.getChangeID().getNodeID()); + createQuery.setParameter("oid", rec.getChangeID().getOriginatingUSN()); + Object existingrecord = null; + try { + existingrecord = createQuery.getSingleResult(); + } catch (Exception ex) { + logger.debug("error checking to see if change record exists already (expected failure)", ex); + } + if (existingrecord != null) { + logger.info("I've already processed change record " + rec.getChangeID().getNodeID() + " " + rec.getChangeID().getOriginatingUSN()); + return; + } + //if it didn't come from here and i haven't seen it yet + ReplicationNotifier.EnqueueRetransmit(rec); + //the remotechange record rec must also be persisted!! + mapChangeRecord = MappingApiToModel.mapChangeRecord(rec); mapChangeRecord.setId(null); + mapChangeRecord.setIsAppliedLocally(true); em.persist(mapChangeRecord); tx.commit(); - logger.debug("Remote CR saved, it was from " + mapChangeRecord.getNodeID() + logger.info("Remote CR saved, it was from " + mapChangeRecord.getNodeID() //this is the origin of the change + " USN:" + mapChangeRecord.getOriginatingUSN() + " Type:" + mapChangeRecord.getRecordType().name() + " Key:" + mapChangeRecord.getEntityKey() - + " Local id:" + mapChangeRecord.getId()); + + " Local id from sender:" + mapChangeRecord.getId()); tx = em.getTransaction(); tx.begin(); //<editor-fold defaultstate="collapsed" desc="delete a record"> @@ -423,6 +456,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey()); if (model != null) { + //if the owner of the new data is me, and the update didn't originate from me if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) && !model.getNodeId().equals(node)) { if (model.getIsTransferInProgress()) { @@ -434,7 +468,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep em.merge(model); } else { //block it, unexpected transfer - throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID()); + throw new Exception("Unexpected entity transfer to to node " + node + " from " + rec.getChangeID().getNodeID()); } } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node) @@ -455,7 +489,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep em.merge(model); } - + } else { model = new BusinessEntity(); MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model); @@ -739,10 +773,31 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep tx.commit(); } catch (Exception drfm) { - logger.warn("Error persisting change record!", drfm); + + logger.warn("Error applying the change record! ", drfm); StringWriter sw = new StringWriter(); JAXB.marshal(rec, sw); logger.warn("This is the record that failed to persist: " + sw.toString()); + if (tx.isActive()) { + tx.rollback(); + } + if (mapChangeRecord != null) { + //set the change record's isApplied to false + try { + tx = em.getTransaction(); + tx.begin(); + mapChangeRecord.setIsAppliedLocally(false); + em.merge(mapChangeRecord); + tx.commit(); + } catch (Exception e) { + logger.error("error updating change record!!", e); + if (tx.isActive()) { + tx.rollback(); + } + } + } else { + logger.fatal("whoa! change record is null when saving a remote change record, this is unexpected and should be reported"); + } } finally { if (tx.isActive()) { tx.rollback(); @@ -751,33 +806,35 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep } } - private HighWaterMarkVectorType getLastChangeRecordFrom(String notifyingNode) { + private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) { HighWaterMarkVectorType ret = new HighWaterMarkVectorType(); ChangeRecordIDType cid = new ChangeRecordIDType(); - cid.setNodeID(notifyingNode); + cid.setNodeID(sourcenode); cid.setOriginatingUSN(0L); EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = em.getTransaction(); try { tx.begin(); - Long id = 0L; + //Long id = 0L; try { - cid.setOriginatingUSN((Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", notifyingNode).setMaxResults(1).getSingleResult()); + cid.setOriginatingUSN((Long) em.createQuery("select MAX(e.originatingUSN) from ChangeRecord e where e.nodeID = :node") + .setParameter("node", sourcenode) + .getSingleResult()); } catch (Exception ex) { - logger.info(ex); + logger.info("unexpected error searching for last record from " + sourcenode, ex); } tx.rollback(); } catch (Exception drfm) { - logger.warn("error caught fetching newest record from node " + notifyingNode, drfm); + logger.warn("error caught fetching newest record from node " + sourcenode, drfm); } finally { if (tx.isActive()) { tx.rollback(); } em.close(); } - + logger.info("Highest known record for " + sourcenode + " is " + cid.getOriginatingUSN()); ret.getHighWaterMark().add(cid); return ret; @@ -851,9 +908,10 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = em.getTransaction(); try { + tx.begin(); StringBuilder sql = new StringBuilder(); sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc"); - sql.toString(); + //sql.toString(); Query qry = em.createQuery(sql.toString()); qry.setMaxResults(1); @@ -933,7 +991,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep * recent change record that has been successfully processed by * the invocating node */ - int maxrecords = 100; + int maxrecords = 100; //TODO config this if (responseLimitCount != null) { maxrecords = responseLimitCount.intValue(); } @@ -941,66 +999,83 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep tx.begin(); Long firstrecord = 0L; Long lastrecord = null; - + Query createQuery = null; +//SELECT t0.id, t0.change_contents, t0.entity_key, t0.appliedlocal, t0.node_id, t0.orginating_usn, t0.record_type FROM j3_chg_record t0 WHERE (t0.id > NULL AND t0.node_id = ?) ORDER BY t0.id ASC if (changesAlreadySeen != null) { //this is basically a lower limit (i.e. the newest record that was processed by the requestor //therefore we want the oldest record stored locally to return to the requestor for processing for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) { + firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN(); + if (firstrecord == null) { + firstrecord = 0L; + } if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) { - firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN(); + //special case, search by database id + createQuery = em.createQuery("select e from ChangeRecord e where " + + "(e.id > :inbound AND e.nodeID = :node) " + + "order by e.id ASC"); + + } else { + createQuery = em.createQuery("select e from ChangeRecord e where " + + "e.originatingUSN > :inbound AND e.nodeID = :node " + + "order by e.originatingUSN ASC"); } - } - } - if (responseLimitVector != null) { - //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned. - //upper limit basically - for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) { - if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { - lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN(); + logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords); + logger.info("This node is" + node + ", request is for data originated from " + changesAlreadySeen.getHighWaterMark().get(i).getNodeID() + " and it's being sent back to " + requestingNode); + + createQuery.setMaxResults(maxrecords); + createQuery.setParameter("inbound", firstrecord); + createQuery.setParameter("node", changesAlreadySeen.getHighWaterMark().get(i).getNodeID()); + List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); + logger.info(records.size() + " CR records returned from query"); + for (int x = 0; x < records.size(); x++) { + ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(x)); + //if (!Excluded(changesAlreadySeen, r)) { + ret.add(r); + //} + } } - } - - logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords); - Query createQuery = null; - /* - //this don't work - if (lastrecord != null) { - createQuery = em.createQuery("select e from ChangeRecord e where " - + "((e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) OR " - + "(e.originatingUSN > :inbound AND e.nodeID <> :node AND e.originatingUSN < :lastrecord)) " - + "order by e.id ASC"); - createQuery.setParameter("lastrecord", lastrecord); - } else { - createQuery = em.createQuery("select e from ChangeRecord e where " - + "((e.id > :inbound AND e.nodeID = :node) OR " - + "(e.originatingUSN > :inbound AND e.nodeID <> :node)) " - + "order by e.id ASC"); - }*/ - if (lastrecord != null) { - createQuery = em.createQuery("select e from ChangeRecord e where " - + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) " - + "order by e.id ASC"); - createQuery.setParameter("lastrecord", lastrecord); - } else { - createQuery = em.createQuery("select e from ChangeRecord e where " - + "(e.id > :inbound AND e.nodeID = :node) " - + "order by e.id ASC"); - } - createQuery.setMaxResults(maxrecords); - createQuery.setParameter("inbound", firstrecord); - createQuery.setParameter("node", node); - - List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); - logger.info(records.size() + " CR records returned from query"); - for (int i = 0; i < records.size(); i++) { - ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i)); - if (!Excluded(changesAlreadySeen, r)) { - ret.add(r); + } /*if (responseLimitVector != null) { + //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned. + //upper limit basically + for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) { + //if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) { + lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN(); + //} + } + }*/ else { + if (firstrecord == null) { + firstrecord = 0L; + } + //assume that they just want records that originated from here? + logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords); + logger.info("This node is" + node + " requesting node " + requestingNode); + + if (lastrecord != null) { + createQuery = em.createQuery("select e from ChangeRecord e where " + + "(e.id > :inbound AND e.nodeID = :node AND e.id < :lastrecord) " + + "order by e.id ASC"); + createQuery.setParameter("lastrecord", lastrecord); + } else { + createQuery = em.createQuery("select e from ChangeRecord e where " + + "(e.id > :inbound AND e.nodeID = :node) " + + "order by e.id ASC"); } + createQuery.setMaxResults(maxrecords); + createQuery.setParameter("inbound", firstrecord); + createQuery.setParameter("node", node); + + List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList(); + logger.info(records.size() + " CR records returned from query"); + for (int i = 0; i < records.size(); i++) { + ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i)); + //if (!Excluded(changesAlreadySeen, r)) { + ret.add(r); + //} + } } - tx.rollback(); long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS, @@ -1120,14 +1195,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx); logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size()); - if (!queue.contains(body.getNotifyingNode())) { - queue.add(body.getNotifyingNode()); - } + //if (!queue.contains(body.getNotifyingNode())) { + queue.add(body); + //} long procTime = System.currentTimeMillis() - startTime; serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE, QueryStatus.SUCCESS, procTime); } - private static Queue<String> queue = null; + private static Queue<NotifyChangeRecordsAvailable> queue = null; /** * transfers custody of an entity from node1/user1 to node2/user2 http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/api/util/JUDDIQuery.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/api/util/JUDDIQuery.java b/juddi-core/src/main/java/org/apache/juddi/api/util/JUDDIQuery.java index df501a9..7951030 100644 --- a/juddi-core/src/main/java/org/apache/juddi/api/util/JUDDIQuery.java +++ b/juddi-core/src/main/java/org/apache/juddi/api/util/JUDDIQuery.java @@ -20,8 +20,7 @@ import java.util.Hashtable; import java.util.List; /** - * - * @author alex + * enumerated data type to help with MBean lookups */ public enum JUDDIQuery implements UDDIQuery { @@ -47,8 +46,8 @@ public enum JUDDIQuery implements UDDIQuery { GET_REPLICATION_NODES("get_replicationNodes"), ADMIN_SAVE_SUB("admin_saveClientSubscription"), ADMIN_GET_HISTORY("admin_getEntityHistory"), - INVOKE_SYNCSUB("invoke_synchSubscription"); - + INVOKE_SYNCSUB("invoke_synchSubscription"), + ADMIN_GET_FAILED_CRS("getFailedReplicationChangeRecords"); private String _query; private static Hashtable<String, JUDDIQuery> _inquiryQueries = null; @@ -88,7 +87,8 @@ public enum JUDDIQuery implements UDDIQuery { _inquiryQueries.put("admin_saveClientSubscription", JUDDIQuery.ADMIN_SAVE_SUB); _inquiryQueries.put("admin_getEntityHistory", JUDDIQuery.ADMIN_GET_HISTORY); _inquiryQueries.put("invoke_synchSubscription", JUDDIQuery.INVOKE_SYNCSUB); - + _inquiryQueries.put("getFailedReplicationChangeRecords", JUDDIQuery.ADMIN_GET_FAILED_CRS); + } } http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java b/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java index 1dd95a4..1f6cdb3 100644 --- a/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java +++ b/juddi-core/src/main/java/org/apache/juddi/mapping/MappingApiToModel.java @@ -1394,8 +1394,14 @@ public class MappingApiToModel { public static org.apache.juddi.model.ChangeRecord mapChangeRecord(ChangeRecord rec) throws UnsupportedEncodingException { org.apache.juddi.model.ChangeRecord r = new org.apache.juddi.model.ChangeRecord(); - r.setId(rec.getChangeID().getOriginatingUSN()); + //r.setId(rec.getChangeID().getOriginatingUSN()); r.setOriginatingUSN(rec.getChangeID().getOriginatingUSN()); + if (r.getOriginatingUSN()==null){ + logger.warn("strange, the getOriginatingUSN is null!!"); + JAXB.marshal(rec, System.out); + Thread.dumpStack(); + } + r.setNodeID(rec.getChangeID().getNodeID()); if (rec.getChangeRecordNewData() != null) { r.setRecordType(org.apache.juddi.model.ChangeRecord.RecordType.ChangeRecordNewData); r.setEntityKey(rec.getChangeRecordNewData().getOperationalInfo().getEntityKey()); @@ -1436,8 +1442,7 @@ public class MappingApiToModel { } else { throw new UnsupportedEncodingException("unknown type!"); } - r.setNodeID(rec.getChangeID().getNodeID()); - + StringWriter sw = new StringWriter(); JAXB.marshal(rec, sw); r.setContents(sw.toString().getBytes("UTF8")); http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/model/ChangeRecord.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/model/ChangeRecord.java b/juddi-core/src/main/java/org/apache/juddi/model/ChangeRecord.java index e9113b4..b0de15c 100644 --- a/juddi-core/src/main/java/org/apache/juddi/model/ChangeRecord.java +++ b/juddi-core/src/main/java/org/apache/juddi/model/ChangeRecord.java @@ -25,9 +25,13 @@ import javax.persistence.Id; import javax.persistence.Lob; import javax.persistence.Table; import javax.persistence.TableGenerator; +import javax.persistence.UniqueConstraint; @Entity -@Table(name = "j3_chg_record") +@Table(name = "j3_chg_record" + //, uniqueConstraints = @UniqueConstraint(columnNames = {"node_id", "orginating_usn"}) +) + public class ChangeRecord implements Serializable { private static final long serialVersionUID = 1L; @@ -38,6 +42,7 @@ public class ChangeRecord implements Serializable { private RecordType e = RecordType.ChangeRecordNull; private Long id; private String entityKey; + private boolean appliedLocally = false; @Column(name = "change_contents") @Lob @@ -114,4 +119,17 @@ public class ChangeRecord implements Serializable { public void setOriginatingUSN(Long value) { this.originatingUSN = value; } + + /** + * returns true if the changes represented by this change record were applied successfully at this node + * @return + */ + @Column(name = "appliedlocal") + public boolean getIsAppliedLocally() { + return appliedLocally; + } + + public void setIsAppliedLocally(boolean value) { + this.appliedLocally = value; + } } http://git-wip-us.apache.org/repos/asf/juddi/blob/c0354a44/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java ---------------------------------------------------------------------- diff --git a/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java b/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java index 23a8845..59d5b6a 100644 --- a/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java +++ b/juddi-core/src/main/java/org/apache/juddi/replication/ReplicationNotifier.java @@ -16,6 +16,7 @@ */ package org.apache.juddi.replication; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -25,6 +26,8 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.persistence.EntityManager; import javax.persistence.EntityTransaction; import javax.persistence.Query; @@ -37,6 +40,7 @@ import org.apache.juddi.api_v3.Node; import org.apache.juddi.config.AppConfig; import org.apache.juddi.config.PersistenceManager; import org.apache.juddi.config.Property; +import org.apache.juddi.mapping.MappingApiToModel; import org.apache.juddi.mapping.MappingModelToApi; import org.apache.juddi.model.ChangeRecord; import org.apache.juddi.model.ReplicationConfiguration; @@ -91,34 +95,50 @@ public class ReplicationNotifier extends TimerTask { //ReplicationNotifier.Enqueue(this); public synchronized static void Enqueue(org.apache.juddi.model.ChangeRecord change) { if (queue == null) { - queue = new ConcurrentLinkedQueue<ChangeRecord>(); + queue = new ConcurrentLinkedQueue<org.apache.juddi.model.ChangeRecord>(); } queue.add(change); } - static Queue<ChangeRecord> queue; + + public synchronized static void EnqueueRetransmit(org.uddi.repl_v3.ChangeRecord change) { + if (queue2 == null) { + queue2 = new ConcurrentLinkedQueue<org.uddi.repl_v3.ChangeRecord>(); + } + queue2.add(change); + } + static Queue<org.apache.juddi.model.ChangeRecord> queue; + static Queue<org.uddi.repl_v3.ChangeRecord> queue2; /** + * Note: this is for locally originated changes only, see + * {@link org.apache.juddi.api.impl.UDDIReplicationImpl.PullTimerTask#PersistChangeRecord PersistChangeRecord + * } for how remote changes are processed * * @param j must be one of the UDDI save APIs + * */ - protected void ProcessChangeRecord(ChangeRecord j) { + protected void ProcessChangeRecord(org.apache.juddi.model.ChangeRecord j) { //store and convert the changes to database model + //TODO need a switch to send the notification without persisting the record + //this is to support multihop notifications EntityManager em = PersistenceManager.getEntityManager(); EntityTransaction tx = null; try { tx = em.getTransaction(); tx.begin(); - + j.setIsAppliedLocally(true); em.persist(j); - log.debug("CR saved locally, it was from " + j.getNodeID() + j.setOriginatingUSN(j.getId()); + em.merge(j); + log.info("CR saved locally, it was from " + j.getNodeID() + " USN:" + j.getOriginatingUSN() + " Type:" + j.getRecordType().name() + " Key:" + j.getEntityKey() + " Local id:" + j.getId()); tx.commit(); } catch (Exception ex) { - log.error("error", ex); + log.fatal("unable to store local change record locally!!", ex); if (tx != null && tx.isActive()) { tx.rollback(); } @@ -128,6 +148,12 @@ public class ReplicationNotifier extends TimerTask { } log.debug("ChangeRecord: " + j.getId() + "," + j.getEntityKey() + "," + j.getNodeID() + "," + j.getOriginatingUSN() + "," + j.getRecordType().toString()); + SendNotifications(j.getId(), j.getNodeID(), false); + + } + + private void SendNotifications(Long id, String origin_node, boolean isRetrans) { + org.uddi.repl_v3.ReplicationConfiguration repcfg = FetchEdges(); //TODO figure out what this statement means 7.5.3 @@ -142,10 +168,16 @@ public class ReplicationNotifier extends TimerTask { return; } + if (id==null || origin_node==null){ + log.fatal("Either the id is null or the origin_node is null. I can't send out this alert!!"); + //throw new Exception(node); + return; + } + Set<Object> destinationUrls = new HashSet<Object>(); if (repcfg.getCommunicationGraph() == null - || repcfg.getCommunicationGraph().getEdge().isEmpty()) { + || repcfg.getCommunicationGraph().getEdge().isEmpty() && !isRetrans) { //no edges or graph defined, default to the operator list for (Operator o : repcfg.getOperator()) { //no need to tell myself about a change at myself @@ -177,80 +209,78 @@ public class ReplicationNotifier extends TimerTask { } } } - if (container.primaryUrl!=null) + if (container.primaryUrl != null) { destinationUrls.add(container); + } } - + } } UDDIReplicationPortType x = uddiService.getUDDIReplicationPort(); - if (destinationUrls.isEmpty()) + if (destinationUrls.isEmpty()) { log.fatal("Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!"); + } for (Object s : destinationUrls) { - + NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable(); req.setNotifyingNode(node); HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType(); - highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(node, j.getId())); + highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(origin_node, id)); req.setChangesAvailable(highWaterMarkVectorType); - - - if (s instanceof String) - SendNotification(x,(String)s, req); - else if (s instanceof PrimaryAlternate) - { + + if (s instanceof String) { + SendNotification(x, (String) s, req); + } else if (s instanceof PrimaryAlternate) { //more complex directed graph stuff - PrimaryAlternate pa = (PrimaryAlternate)s; - if (!SendNotification(x, pa.primaryUrl, req)) - { - for (String url : pa.alternateUrls){ - if (SendNotification(x, url, req)) + PrimaryAlternate pa = (PrimaryAlternate) s; + if (!SendNotification(x, pa.primaryUrl, req)) { + for (String url : pa.alternateUrls) { + if (SendNotification(x, url, req)) { break; + } //no need to continue to additional alternates } - } - else - { + } else { //primary url succeeded, no further action required } - + } - + //TODO the spec talks about control messages, should we even support it? seems pointless - - } + } /** * return true if successful + * * @param x * @param s * @param req - * @return + * @return */ private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) { - ((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s); - try { - x.notifyChangeRecordsAvailable(req); - log.debug("Successfully sent change record available message to " + s); - return true; - } catch (Exception ex) { - log.warn("Unable to send change notification to " + s); - log.debug("Unable to send change notification to " + s, ex); - } - return false; + ((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s); + try { + x.notifyChangeRecordsAvailable(req); + log.info("Successfully sent change record available message to " + s + " this node: " + node); + return true; + } catch (Exception ex) { + log.warn("Unable to send change notification to " + s + " this node: " + node); + log.debug("Unable to send change notification to " + s, ex); + } + return false; } class PrimaryAlternate { - String primaryUrl=null; - List<String> alternateUrls=new ArrayList<String>(); + String primaryUrl = null; + List<String> alternateUrls = new ArrayList<String>(); } public synchronized void run() { @@ -258,6 +288,9 @@ public class ReplicationNotifier extends TimerTask { if (queue == null) { queue = new ConcurrentLinkedQueue(); } + if (queue2 == null) { + queue2 = new ConcurrentLinkedQueue(); + } //TODO revisie this if (!queue.isEmpty()) { log.info("Replication, Notifying nodes of new change records. " + queue.size() + " queued"); @@ -271,6 +304,22 @@ public class ReplicationNotifier extends TimerTask { ProcessChangeRecord(j); } + + while (!queue2.isEmpty()) { + //for each change at this node + + org.uddi.repl_v3.ChangeRecord j = queue2.poll(); + + ChangeRecord model = new ChangeRecord(); + try { + model=MappingApiToModel.mapChangeRecord(j); + } catch (UnsupportedEncodingException ex) { + Logger.getLogger(ReplicationNotifier.class.getName()).log(Level.SEVERE, null, ex); + } + log.info("retransmitting CR notificationm entity owner: " + j.getChangeID().getNodeID() + " CR: " + j.getChangeID().getOriginatingUSN() + " key:" + model.getEntityKey() + " " + model.getRecordType().name() + " accepted locally:"+ model.getIsAppliedLocally()); + SendNotifications(j.getChangeID().getOriginatingUSN(), j.getChangeID().getNodeID(), true); + + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
