Updated Branches:
  refs/heads/develop f3f2dc36d -> 67ec953dc

allow setting different multicast ports and addresses for clustered caches in 
the KiWi triplestore


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/487e3149
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/487e3149
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/487e3149

Branch: refs/heads/develop
Commit: 487e3149e75e07118d3bab69f4cd46d87204447c
Parents: 2eaf46f
Author: Sebastian Schaffert <[email protected]>
Authored: Mon Feb 3 13:07:47 2014 +0100
Committer: Sebastian Schaffert <[email protected]>
Committed: Mon Feb 3 13:07:47 2014 +0100

----------------------------------------------------------------------
 .../marmotta/kiwi/caching/KiWiCacheManager.java |  74 ++++++---
 .../marmotta/kiwi/config/KiWiConfiguration.java |  65 +++++++-
 .../src/main/resources/jgroups-kiwi.xml         |   4 +-
 .../apache/marmotta/kiwi/test/ClusterTest.java  | 158 +++++++++++++++++++
 4 files changed, 277 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
----------------------------------------------------------------------
diff --git 
a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
 
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
index 5b86619..dfbd459 100644
--- 
a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
+++ 
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
@@ -17,8 +17,10 @@
  */
 package org.apache.marmotta.kiwi.caching;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.infinispan.Cache;
+import org.infinispan.commons.CacheException;
 import org.infinispan.commons.marshall.AdvancedExternalizer;
 import org.infinispan.configuration.cache.CacheMode;
 import org.infinispan.configuration.cache.Configuration;
@@ -34,6 +36,7 @@ import org.infinispan.manager.EmbeddedCacheManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -83,20 +86,45 @@ public class KiWiCacheManager {
         this.kiWiConfiguration = config;
 
         if(clustered) {
-            globalConfiguration = new GlobalConfigurationBuilder()
-                    .classLoader(KiWiCacheManager.class.getClassLoader())
-                    .transport()
-                        .defaultTransport()
-                        .clusterName(config.getClusterName())
-                        .machineId("instance-" + config.getDatacenterId())
-                        .addProperty("configurationFile", "jgroups-kiwi.xml")
-                    .globalJmxStatistics()
-                        .jmxDomain("org.apache.marmotta.kiwi")
-                        .allowDuplicateDomains(true)
-                    .serialization()
-                        .addAdvancedExternalizer(externalizers)
-                    .build();
+            try {
+                String jgroupsXml = 
IOUtils.toString(KiWiCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
+
+                jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", 
String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
+                jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", 
String.format("mcast_port=\"%d\"", config.getClusterPort()));
+
+
+                globalConfiguration = new GlobalConfigurationBuilder()
+                        .classLoader(KiWiCacheManager.class.getClassLoader())
+                        .transport()
+                            .defaultTransport()
+                            .clusterName(config.getClusterName())
+                            .machineId("instance-" + config.getDatacenterId())
+                            .addProperty("configurationXml", jgroupsXml)
+                        .globalJmxStatistics()
+                            .jmxDomain("org.apache.marmotta.kiwi")
+                            .allowDuplicateDomains(true)
+                        .serialization()
+                            .addAdvancedExternalizer(externalizers)
+                        .build();
+            } catch (IOException ex) {
+                log.warn("error loading JGroups configuration from archive: 
{}", ex.getMessage());
+                log.warn("some configuration options will not be available");
+
+                globalConfiguration = new GlobalConfigurationBuilder()
+                        .classLoader(KiWiCacheManager.class.getClassLoader())
+                            .transport()
+                            .defaultTransport()
+                            .clusterName(config.getClusterName())
+                            .machineId("instance-" + config.getDatacenterId())
+                            .addProperty("configurationFile", 
"jgroups-kiwi.xml")
+                        .globalJmxStatistics()
+                            .jmxDomain("org.apache.marmotta.kiwi")
+                            .allowDuplicateDomains(true)
+                        .serialization()
+                            .addAdvancedExternalizer(externalizers)
+                        .build();
 
+            }
 
 
             defaultConfiguration = new ConfigurationBuilder()
@@ -460,15 +488,19 @@ public class KiWiCacheManager {
      * Shutdown this cache manager instance. Will shutdown the underlying 
EHCache cache manager.
      */
     public void shutdown() {
-        if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) {
-            log.warn("shutting down cache manager ...");
-            if(cacheManager.getTransport() != null) {
-                log.info("... shutting down transport ...");
-                cacheManager.getTransport().stop();
+        try {
+            if(embedded && cacheManager.getStatus() == 
ComponentStatus.RUNNING) {
+                log.warn("shutting down cache manager ...");
+//                if(cacheManager.getTransport() != null) {
+//                    log.info("... shutting down transport ...");
+//                    cacheManager.getTransport().stop();
+//                }
+                log.info("... shutting down main component ...");
+                cacheManager.stop();
+                log.info("... done!");
             }
-            log.info("... shutting down main component ...");
-            cacheManager.stop();
-            log.info("... done!");
+        } catch (CacheException ex) {
+            log.warn("error shutting down cache: {}", ex.getMessage());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
----------------------------------------------------------------------
diff --git 
a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
 
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
index b94fc9b..8be3c76 100644
--- 
a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
+++ 
b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
@@ -111,9 +111,20 @@ public class KiWiConfiguration {
      */
     private boolean clustered = false;
 
-
+    /**
+     * Name of the cache cluster this triplestore belongs to.
+     */
     private String clusterName = "Marmotta";
 
+    /**
+     * Multicast port of the cache cluster is listening on and distributing 
cache updates.
+     */
+    private int clusterPort = 46655;
+
+    /**
+     * Multicast address of the cache cluster is listening on and distributing 
cache updates.
+     */
+    private String clusterAddress = "228.6.7.8";
 
     public KiWiConfiguration(String name, String jdbcUrl, String dbUser, 
String dbPassword, KiWiDialect dialect) {
         this(name, jdbcUrl, dbUser, dbPassword, dialect, null, null);
@@ -418,4 +429,56 @@ public class KiWiConfiguration {
     public void setClusterName(String clusterName) {
         this.clusterName = clusterName;
     }
+
+    /**
+     * Return the multicast port used by the cache cluster this triplestore 
belongs to. This port number is
+     * used by JGroups to distribute and receive cache synchronization 
updates. Triplestores with different
+     * data should use different ports or addresses,
+     * <p/>
+     * Only used in case isClustered() is true
+     *
+     * @return
+     */
+    public int getClusterPort() {
+        return clusterPort;
+    }
+
+    /**
+     * Change the multicast port used by the cache cluster this triplestore 
belongs to. This port number is
+     * used by JGroups to distribute and receive cache synchronization 
updates. Triplestores with different
+     * data should use different ports or addresses.
+     * <p/>
+     * Only used in case isClustered() is true
+     *
+     * @return
+     */
+    public void setClusterPort(int clusterPort) {
+        this.clusterPort = clusterPort;
+    }
+
+    /**
+     * Return the multicast address used by the cache cluster this triplestore 
belongs to. This address is
+     * used by JGroups to distribute and receive cache synchronization 
updates. Triplestores with different
+     * data should use different ports or addresses,
+     * <p/>
+     * Only used in case isClustered() is true
+     *
+     * @return
+     */
+    public String getClusterAddress() {
+        return clusterAddress;
+    }
+
+    /**
+     * Change the multicast address used by the cache cluster this triplestore 
belongs to. This address is
+     * used by JGroups to distribute and receive cache synchronization 
updates. Triplestores with different
+     * data should use different ports or addresses,
+     * <p/>
+     * Only used in case isClustered() is true
+     *
+     * @return
+     */
+    public void setClusterAddress(String clusterAddress) {
+        this.clusterAddress = clusterAddress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml
----------------------------------------------------------------------
diff --git 
a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml 
b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml
index b0b05c5..51c1b65 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml
@@ -19,8 +19,8 @@
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="urn:org:jgroups 
http://www.jgroups.org/schema/JGroups-3.4.xsd";>
    <UDP
-         mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
-         mcast_port="${jgroups.udp.mcast_port:46655}"
+         mcast_addr="228.6.7.8"
+         mcast_port="46655"
          tos="8"
          ucast_recv_buf_size="20m"
          ucast_send_buf_size="640k"

http://git-wip-us.apache.org/repos/asf/marmotta/blob/487e3149/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git 
a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
 
b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
new file mode 100644
index 0000000..cfff77b
--- /dev/null
+++ 
b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert ([email protected])
+ */
+public class ClusterTest {
+
+    private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
+
+    KiWiConfiguration config1, config2;
+
+    KiWiStore store1, store2;
+
+    Repository repository1, repository2;
+
+    KiWiCacheManager cacheManager1, cacheManager2;
+
+    @Before
+    public void setup() throws RepositoryException {
+        config1 = new KiWiConfiguration(
+                "default-H2",
+                
"jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+                "kiwi", "kiwi",
+                new H2Dialect());
+        config1.setDatacenterId(1);
+        config1.setClustered(true);
+
+        config2 = new KiWiConfiguration(
+                "default-H2",
+                
"jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+                "kiwi", "kiwi",
+                new H2Dialect());
+        config2.setDatacenterId(2);
+        config2.setClustered(true);
+
+
+
+    }
+
+    public void setupCluster(int port1, int port2) throws RepositoryException {
+        config1.setClusterPort(port1);
+        config2.setClusterPort(port2);
+
+        store1 = new KiWiStore(config1);
+        store2 = new KiWiStore(config2);
+
+        repository1 = new SailRepository(store1);
+        repository2 = new SailRepository(store2);
+
+        repository1.initialize();
+        repository2.initialize();
+
+        cacheManager1 = store1.getPersistence().getCacheManager();
+        cacheManager2 = store2.getPersistence().getCacheManager();
+    }
+
+
+    @After
+    public void teardown() throws RepositoryException {
+        repository1.shutDown();
+        repository2.shutDown();
+    }
+
+
+    @Test
+    public void testClusteredCacheSync() throws InterruptedException, 
RepositoryException {
+        setupCluster(61222,61222);
+
+        log.info("testing cache synchronization ...");
+
+        URI u = 
repository1.getValueFactory().createURI("http://localhost/test1";);
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) 
cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1";));
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster 
...");
+        URI u2 = (URI) 
cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1";));
+
+        Assert.assertNotNull(u2);
+        Assert.assertEquals(u,u2);
+    }
+
+    @Test
+    public void testDisjointClusters() throws InterruptedException, 
RepositoryException {
+        setupCluster(61224,61225);
+
+        log.info("testing caches on different ports ...");
+
+        URI u = 
repository1.getValueFactory().createURI("http://localhost/test1";);
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) 
cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1";));
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster 
...");
+        URI u2 = (URI) 
cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1";));
+
+        Assert.assertNull(u2);
+    }
+
+
+    private static Long createCacheKey(String svalue) {
+        Hasher hasher = Hashing.goodFastHash(64).newHasher();
+        hasher.putString(svalue);
+        return hasher.hash().asLong();
+    }
+
+}

Reply via email to