Updated Branches: refs/heads/develop f3f2dc36d -> 67ec953dc
allow setting different multicast ports and addresses for clustered caches in the KiWi triplestore Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/487e3149 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/487e3149 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/487e3149 Branch: refs/heads/develop Commit: 487e3149e75e07118d3bab69f4cd46d87204447c Parents: 2eaf46f Author: Sebastian Schaffert <[email protected]> Authored: Mon Feb 3 13:07:47 2014 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Mon Feb 3 13:07:47 2014 +0100 ---------------------------------------------------------------------- .../marmotta/kiwi/caching/KiWiCacheManager.java | 74 ++++++--- .../marmotta/kiwi/config/KiWiConfiguration.java | 65 +++++++- .../src/main/resources/jgroups-kiwi.xml | 4 +- .../apache/marmotta/kiwi/test/ClusterTest.java | 158 +++++++++++++++++++ 4 files changed, 277 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java index 5b86619..dfbd459 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java @@ -17,8 +17,10 @@ */ package org.apache.marmotta.kiwi.caching; +import org.apache.commons.io.IOUtils; import org.apache.marmotta.kiwi.config.KiWiConfiguration; import org.infinispan.Cache; +import org.infinispan.commons.CacheException; import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; @@ -34,6 +36,7 @@ import org.infinispan.manager.EmbeddedCacheManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -83,20 +86,45 @@ public class KiWiCacheManager { this.kiWiConfiguration = config; if(clustered) { - globalConfiguration = new GlobalConfigurationBuilder() - .classLoader(KiWiCacheManager.class.getClassLoader()) - .transport() - .defaultTransport() - .clusterName(config.getClusterName()) - .machineId("instance-" + config.getDatacenterId()) - .addProperty("configurationFile", "jgroups-kiwi.xml") - .globalJmxStatistics() - .jmxDomain("org.apache.marmotta.kiwi") - .allowDuplicateDomains(true) - .serialization() - .addAdvancedExternalizer(externalizers) - .build(); + try { + String jgroupsXml = IOUtils.toString(KiWiCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml")); + + jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress())); + jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort())); + + + globalConfiguration = new GlobalConfigurationBuilder() + .classLoader(KiWiCacheManager.class.getClassLoader()) + .transport() + .defaultTransport() + .clusterName(config.getClusterName()) + .machineId("instance-" + config.getDatacenterId()) + .addProperty("configurationXml", jgroupsXml) + .globalJmxStatistics() + .jmxDomain("org.apache.marmotta.kiwi") + .allowDuplicateDomains(true) + .serialization() + .addAdvancedExternalizer(externalizers) + .build(); + } catch (IOException ex) { + log.warn("error loading JGroups configuration from archive: {}", ex.getMessage()); + log.warn("some configuration options will not be available"); + + globalConfiguration = new GlobalConfigurationBuilder() + .classLoader(KiWiCacheManager.class.getClassLoader()) + .transport() + .defaultTransport() + .clusterName(config.getClusterName()) + .machineId("instance-" + config.getDatacenterId()) + .addProperty("configurationFile", "jgroups-kiwi.xml") + .globalJmxStatistics() + .jmxDomain("org.apache.marmotta.kiwi") + .allowDuplicateDomains(true) + .serialization() + .addAdvancedExternalizer(externalizers) + .build(); + } defaultConfiguration = new ConfigurationBuilder() @@ -460,15 +488,19 @@ public class KiWiCacheManager { * Shutdown this cache manager instance. Will shutdown the underlying EHCache cache manager. */ public void shutdown() { - if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) { - log.warn("shutting down cache manager ..."); - if(cacheManager.getTransport() != null) { - log.info("... shutting down transport ..."); - cacheManager.getTransport().stop(); + try { + if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) { + log.warn("shutting down cache manager ..."); +// if(cacheManager.getTransport() != null) { +// log.info("... shutting down transport ..."); +// cacheManager.getTransport().stop(); +// } + log.info("... shutting down main component ..."); + cacheManager.stop(); + log.info("... done!"); } - log.info("... shutting down main component ..."); - cacheManager.stop(); - log.info("... done!"); + } catch (CacheException ex) { + log.warn("error shutting down cache: {}", ex.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java index b94fc9b..8be3c76 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java @@ -111,9 +111,20 @@ public class KiWiConfiguration { */ private boolean clustered = false; - + /** + * Name of the cache cluster this triplestore belongs to. + */ private String clusterName = "Marmotta"; + /** + * Multicast port of the cache cluster is listening on and distributing cache updates. + */ + private int clusterPort = 46655; + + /** + * Multicast address of the cache cluster is listening on and distributing cache updates. + */ + private String clusterAddress = "228.6.7.8"; public KiWiConfiguration(String name, String jdbcUrl, String dbUser, String dbPassword, KiWiDialect dialect) { this(name, jdbcUrl, dbUser, dbPassword, dialect, null, null); @@ -418,4 +429,56 @@ public class KiWiConfiguration { public void setClusterName(String clusterName) { this.clusterName = clusterName; } + + /** + * Return the multicast port used by the cache cluster this triplestore belongs to. This port number is + * used by JGroups to distribute and receive cache synchronization updates. Triplestores with different + * data should use different ports or addresses, + * <p/> + * Only used in case isClustered() is true + * + * @return + */ + public int getClusterPort() { + return clusterPort; + } + + /** + * Change the multicast port used by the cache cluster this triplestore belongs to. This port number is + * used by JGroups to distribute and receive cache synchronization updates. Triplestores with different + * data should use different ports or addresses. + * <p/> + * Only used in case isClustered() is true + * + * @return + */ + public void setClusterPort(int clusterPort) { + this.clusterPort = clusterPort; + } + + /** + * Return the multicast address used by the cache cluster this triplestore belongs to. This address is + * used by JGroups to distribute and receive cache synchronization updates. Triplestores with different + * data should use different ports or addresses, + * <p/> + * Only used in case isClustered() is true + * + * @return + */ + public String getClusterAddress() { + return clusterAddress; + } + + /** + * Change the multicast address used by the cache cluster this triplestore belongs to. This address is + * used by JGroups to distribute and receive cache synchronization updates. Triplestores with different + * data should use different ports or addresses, + * <p/> + * Only used in case isClustered() is true + * + * @return + */ + public void setClusterAddress(String clusterAddress) { + this.clusterAddress = clusterAddress; + } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml index b0b05c5..51c1b65 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml +++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml @@ -19,8 +19,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd"> <UDP - mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" - mcast_port="${jgroups.udp.mcast_port:46655}" + mcast_addr="228.6.7.8" + mcast_port="46655" tos="8" ucast_recv_buf_size="20m" ucast_send_buf_size="640k" http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java new file mode 100644 index 0000000..cfff77b --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.test; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.marmotta.kiwi.caching.KiWiCacheManager; +import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.persistence.h2.H2Dialect; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class ClusterTest { + + private static Logger log = LoggerFactory.getLogger(ClusterTest.class); + + KiWiConfiguration config1, config2; + + KiWiStore store1, store2; + + Repository repository1, repository2; + + KiWiCacheManager cacheManager1, cacheManager2; + + @Before + public void setup() throws RepositoryException { + config1 = new KiWiConfiguration( + "default-H2", + "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", + "kiwi", "kiwi", + new H2Dialect()); + config1.setDatacenterId(1); + config1.setClustered(true); + + config2 = new KiWiConfiguration( + "default-H2", + "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", + "kiwi", "kiwi", + new H2Dialect()); + config2.setDatacenterId(2); + config2.setClustered(true); + + + + } + + public void setupCluster(int port1, int port2) throws RepositoryException { + config1.setClusterPort(port1); + config2.setClusterPort(port2); + + store1 = new KiWiStore(config1); + store2 = new KiWiStore(config2); + + repository1 = new SailRepository(store1); + repository2 = new SailRepository(store2); + + repository1.initialize(); + repository2.initialize(); + + cacheManager1 = store1.getPersistence().getCacheManager(); + cacheManager2 = store2.getPersistence().getCacheManager(); + } + + + @After + public void teardown() throws RepositoryException { + repository1.shutDown(); + repository2.shutDown(); + } + + + @Test + public void testClusteredCacheSync() throws InterruptedException, RepositoryException { + setupCluster(61222,61222); + + log.info("testing cache synchronization ..."); + + URI u = repository1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u2); + Assert.assertEquals(u,u2); + } + + @Test + public void testDisjointClusters() throws InterruptedException, RepositoryException { + setupCluster(61224,61225); + + log.info("testing caches on different ports ..."); + + URI u = repository1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNull(u2); + } + + + private static Long createCacheKey(String svalue) { + Hasher hasher = Hashing.goodFastHash(64).newHasher(); + hasher.putString(svalue); + return hasher.hash().asLong(); + } + +}
