- implementation of Hazelcast cache backend - common base class for cache cluster tests
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/62c44dea Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/62c44dea Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/62c44dea Branch: refs/heads/develop Commit: 62c44deafc9cf6332087368a8372dad37cb3032f Parents: 27a0523 Author: Sebastian Schaffert <[email protected]> Authored: Mon Mar 3 22:30:09 2014 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Mon Mar 3 22:30:09 2014 +0100 ---------------------------------------------------------------------- .../org/apache/marmotta/commons/io/DataIO.java | 11 +- libraries/kiwi/kiwi-caching-hazelcast/pom.xml | 59 +++++ .../caching/HazelcastCacheManager.java | 130 +++++++++- .../marmotta/kiwi/hazelcast/util/AsyncMap.java | 102 ++++++++ .../kiwi/test/cluster/HazelcastClusterTest.java | 36 +++ .../kiwi/test/cluster/SerializerTest.java | 241 +++++++++++++++++++ .../InfinispanEmbeddedCacheManager.java | 6 +- .../apache/marmotta/kiwi/test/ClusterTest.java | 160 ------------ .../kiwi/test/cluster/BaseClusterTest.java | 160 ++++++++++++ 9 files changed, 727 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java ---------------------------------------------------------------------- diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java index d236ef7..ca4b53d 100644 --- a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java +++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java @@ -33,7 +33,9 @@ public class DataIO { public static void writeString(DataOutput out, String s) throws IOException { if(s != null) { out.writeInt(s.length()); - out.writeChars(s); + for(int i=0; i<s.length(); i++) { + out.writeChar(s.charAt(i)); + } } else { out.writeInt(-1); } @@ -44,11 +46,12 @@ public class DataIO { int len = in.readInt(); if(len >= 0) { - StringBuilder builder = new StringBuilder(); + char[] result = new char[len]; + for(int i=0; i<len; i++) { - builder.append(in.readChar()); + result[i] = in.readChar(); } - return builder.toString(); + return new String(result); } else { return null; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/pom.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-hazelcast/pom.xml b/libraries/kiwi/kiwi-caching-hazelcast/pom.xml index 833fece..b1f406a 100644 --- a/libraries/kiwi/kiwi-caching-hazelcast/pom.xml +++ b/libraries/kiwi/kiwi-caching-hazelcast/pom.xml @@ -48,6 +48,65 @@ <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> </dependency> + + + <!-- Testing --> + <dependency> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.marmotta</groupId> + <artifactId>kiwi-triplestore</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>hamcrest-core</artifactId> + <groupId>org.hamcrest</groupId> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>hamcrest-library</artifactId> + <groupId>org.hamcrest</groupId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>test</scope> + <optional>true</optional> <!-- GPL licensed, no dependency --> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-rdfxml</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java index 446fa4b..cc037c9 100644 --- a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java +++ b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java @@ -18,11 +18,19 @@ package org.apache.marmotta.kiwi.hazelcast.caching; import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.MaxSizeConfig; import com.hazelcast.config.SerializerConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import org.apache.marmotta.kiwi.caching.CacheManager; +import org.apache.marmotta.kiwi.config.CacheMode; import org.apache.marmotta.kiwi.config.KiWiConfiguration; import org.apache.marmotta.kiwi.hazelcast.serializer.*; +import org.apache.marmotta.kiwi.hazelcast.util.AsyncMap; import org.apache.marmotta.kiwi.model.rdf.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @@ -33,16 +41,56 @@ import java.util.Map; */ public class HazelcastCacheManager implements CacheManager { + public static final String TRIPLE_CACHE = "triple-cache"; + public static final String URI_CACHE = "uri-cache"; + public static final String BNODE_CACHE = "bnode-cache"; + public static final String LITERAL_CACHE = "literal-cache"; + public static final String NODE_CACHE = "node-cache"; + public static final String NS_PREFIX_CACHE = "ns-prefix-cache"; + public static final String NS_URI_CACHE = "ns-uri-cache"; + public static final String REGISTRY_CACHE = "registry-cache"; + + private static Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class); + private KiWiConfiguration configuration; private Config hcConfiguration; + private HazelcastInstance hazelcast; + + private AsyncMap<Long,KiWiNode> nodeCache; + private AsyncMap<Long,KiWiTriple> tripleCache; + private AsyncMap<String,KiWiUriResource> uriCache; + private AsyncMap<String,KiWiAnonResource> bnodeCache; + private AsyncMap<String,KiWiLiteral> literalCache; + private AsyncMap<String,KiWiNamespace> nsPrefixCache; + private AsyncMap<String,KiWiNamespace> nsUriCache; + + private Map<Long,Long> registryCache; + public HazelcastCacheManager(KiWiConfiguration configuration) { this.configuration = configuration; hcConfiguration = new Config(); + hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true); + hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(configuration.getClusterPort()); + hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configuration.getClusterAddress()); + hcConfiguration.getGroupConfig().setName(configuration.getClusterName()); + + + setupSerializers(); + setupCaches(); + + hazelcast = Hazelcast.newHazelcastInstance(hcConfiguration); + + + log.info("initialised Hazelcast distributed cache manager (cluster name: {})", configuration.getClusterName()); + + if(configuration.getCacheMode() != CacheMode.DISTRIBUTED) { + log.warn("Hazelcast only supports distributed cache mode (mode configuration was {})", configuration.getCacheMode()); + } } private void setupSerializers() { @@ -71,6 +119,30 @@ public class HazelcastCacheManager implements CacheManager { hcConfiguration.getSerializationConfig().addSerializerConfig(scUri); } + private void setupCaches() { + setupMapConfig(NODE_CACHE, configuration.getNodeCacheSize()); + setupMapConfig(TRIPLE_CACHE, configuration.getTripleCacheSize()); + setupMapConfig(URI_CACHE, configuration.getUriCacheSize()); + setupMapConfig(BNODE_CACHE, configuration.getBNodeCacheSize()); + setupMapConfig(LITERAL_CACHE, configuration.getLiteralCacheSize()); + setupMapConfig(NS_PREFIX_CACHE, configuration.getNamespaceCacheSize()); + setupMapConfig(NS_URI_CACHE, configuration.getNamespaceCacheSize()); + + } + + + private void setupMapConfig(String name, int size) { + MapConfig cfg = new MapConfig(NODE_CACHE); + cfg.setMaxSizeConfig(new MaxSizeConfig(size, MaxSizeConfig.MaxSizePolicy.PER_PARTITION)); + cfg.setAsyncBackupCount(1); + cfg.setBackupCount(0); + cfg.setEvictionPolicy(MapConfig.EvictionPolicy.LRU); + cfg.setMaxIdleSeconds(600); // 10 minutes + cfg.setTimeToLiveSeconds(3600); // 1 hour + + hcConfiguration.addMapConfig(cfg); + } + /** * Return the node id -> node cache from the cache manager. This cache is heavily used to lookup * nodes when querying or loading triples and should therefore have a decent size (default 500.000 elements). @@ -79,7 +151,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<Long, KiWiNode> getNodeCache() { - return null; + if(nodeCache == null) { + nodeCache = new AsyncMap<>(hazelcast.<Long,KiWiNode>getMap(NODE_CACHE)); + } + + return nodeCache; } /** @@ -90,7 +166,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<Long, KiWiTriple> getTripleCache() { - return null; + if(tripleCache == null) { + tripleCache = new AsyncMap<>(hazelcast.<Long,KiWiTriple>getMap(TRIPLE_CACHE)); + } + + return tripleCache; } /** @@ -101,7 +181,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<String, KiWiUriResource> getUriCache() { - return null; + if(uriCache == null) { + uriCache = new AsyncMap<>(hazelcast.<String,KiWiUriResource>getMap(URI_CACHE)); + } + + return uriCache; } /** @@ -112,7 +196,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<String, KiWiAnonResource> getBNodeCache() { - return null; + if(bnodeCache == null) { + bnodeCache = new AsyncMap<>(hazelcast.<String,KiWiAnonResource>getMap(BNODE_CACHE)); + } + + return bnodeCache; } /** @@ -124,7 +212,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<String, KiWiLiteral> getLiteralCache() { - return null; + if(literalCache == null) { + literalCache = new AsyncMap<>(hazelcast.<String,KiWiLiteral>getMap(LITERAL_CACHE)); + } + + return literalCache; } /** @@ -134,7 +226,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<String, KiWiNamespace> getNamespaceUriCache() { - return null; + if(nsUriCache == null) { + nsUriCache = new AsyncMap<>(hazelcast.<String,KiWiNamespace>getMap(NS_URI_CACHE)); + } + + return nsUriCache; } /** @@ -144,7 +240,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<String, KiWiNamespace> getNamespacePrefixCache() { - return null; + if(nsPrefixCache == null) { + nsPrefixCache = new AsyncMap<>(hazelcast.<String,KiWiNamespace>getMap(NS_PREFIX_CACHE)); + } + + return nsPrefixCache; } /** @@ -155,7 +255,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map<Long, Long> getRegistryCache() { - return null; + if(registryCache == null) { + registryCache = hazelcast.<Long, Long>getMap(REGISTRY_CACHE); + } + + return registryCache; } /** @@ -167,7 +271,7 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public Map getCacheByName(String name) { - return null; + return hazelcast.getMap(name); } /** @@ -175,7 +279,11 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public void clear() { - + for(Map m : new Map[] { nodeCache, tripleCache, uriCache, bnodeCache, literalCache, nsPrefixCache, nsUriCache, registryCache}) { + if(m != null) { + m.clear(); + } + } } /** @@ -183,6 +291,6 @@ public class HazelcastCacheManager implements CacheManager { */ @Override public void shutdown() { - + hazelcast.shutdown(); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java new file mode 100644 index 0000000..7aa4722 --- /dev/null +++ b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.hazelcast.util; + +import com.hazelcast.core.IMap; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * A Map wrapper mapping write methods to their asynchronous equivalents in Hazelcast. + * + * @author Sebastian Schaffert ([email protected]) + */ +public class AsyncMap<K,V> implements Map<K,V> { + + private IMap<K,V> delegate; + + public AsyncMap(IMap<K, V> delegate) { + this.delegate = delegate; + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object o) { + return delegate.containsKey(o); + } + + @Override + public boolean containsValue(Object o) { + return delegate.containsValue(o); + } + + @Override + public V get(Object o) { + return delegate.get(o); + } + + @Override + public V put(K k, V v) { + delegate.putAsync(k,v); + return null; + } + + @Override + public V remove(Object o) { + delegate.removeAsync((K)o); + return null; + } + + @Override + public void putAll(Map<? extends K, ? extends V> map) { + for(Entry<? extends K, ? extends V> entry : map.entrySet()) { + delegate.putAsync(entry.getKey(),entry.getValue()); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set<K> keySet() { + return delegate.keySet(); + } + + @Override + public Collection<V> values() { + return delegate.values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return delegate.entrySet(); + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java new file mode 100644 index 0000000..c53b704 --- /dev/null +++ b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.test.cluster; + +import org.apache.marmotta.kiwi.caching.CacheManagerType; +import org.junit.BeforeClass; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class HazelcastClusterTest extends BaseClusterTest { + + + @BeforeClass + public static void setup() { + ClusterTestSupport s = new ClusterTestSupport(CacheManagerType.HAZELCAST); + s.setup(); + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java new file mode 100644 index 0000000..675db87 --- /dev/null +++ b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.test.cluster; + +import com.hazelcast.config.SerializationConfig; +import com.hazelcast.config.SerializerConfig; +import com.hazelcast.nio.serialization.*; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.marmotta.commons.vocabulary.XSD; +import org.apache.marmotta.kiwi.hazelcast.serializer.*; +import org.apache.marmotta.kiwi.model.rdf.*; +import org.apache.marmotta.kiwi.test.TestValueFactory; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDFS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Random; + +/** + * Test the different externalizer implementations we provide for Infinispan + * + * @author Sebastian Schaffert ([email protected]) + */ +public class SerializerTest { + + private static ValueFactory valueFactory = new TestValueFactory(); + + private static Random rnd = new Random(); + + private static Logger log = LoggerFactory.getLogger(SerializerTest.class); + + + private static SerializationService simpleService, fullService; + + @BeforeClass + public static void setup() { + simpleService = new SerializationServiceBuilder().build(); + + + SerializationConfig config = new SerializationConfig(); + SerializerConfig scBNode = new SerializerConfig().setImplementation(new BNodeSerializer()).setTypeClass(KiWiAnonResource.class); + config.addSerializerConfig(scBNode); + + SerializerConfig scBoolean = new SerializerConfig().setImplementation(new BooleanLiteralSerializer()).setTypeClass(KiWiBooleanLiteral.class); + config.addSerializerConfig(scBoolean); + + SerializerConfig scDate = new SerializerConfig().setImplementation(new DateLiteralSerializer()).setTypeClass(KiWiDateLiteral.class); + config.addSerializerConfig(scDate); + + SerializerConfig scDouble = new SerializerConfig().setImplementation(new DoubleLiteralSerializer()).setTypeClass(KiWiDoubleLiteral.class); + config.addSerializerConfig(scDouble); + + SerializerConfig scInt = new SerializerConfig().setImplementation(new IntLiteralSerializer()).setTypeClass(KiWiIntLiteral.class); + config.addSerializerConfig(scInt); + + SerializerConfig scString = new SerializerConfig().setImplementation(new StringLiteralSerializer()).setTypeClass(KiWiStringLiteral.class); + config.addSerializerConfig(scString); + + SerializerConfig scTriple = new SerializerConfig().setImplementation(new TripleSerializer()).setTypeClass(KiWiTriple.class); + config.addSerializerConfig(scTriple); + + SerializerConfig scUri = new SerializerConfig().setImplementation(new UriSerializer()).setTypeClass(KiWiUriResource.class); + config.addSerializerConfig(scUri); + + + fullService = new SerializationServiceBuilder().setConfig(config).build(); + + + } + + + @Test + public void testUriResource() throws Exception { + marshall((KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)), new UriSerializer()); + } + + @Test + public void testCompressedUriResource() throws Exception { + marshall((KiWiUriResource) valueFactory.createURI(XSD.Double.stringValue()), new UriSerializer()); + marshall((KiWiUriResource) valueFactory.createURI(RDFS.LABEL.stringValue()), new UriSerializer()); + marshall((KiWiUriResource) valueFactory.createURI(OWL.SAMEAS.stringValue()), new UriSerializer()); + } + + + @Test + public void testBNode() throws Exception { + marshall((KiWiAnonResource) valueFactory.createBNode(), new BNodeSerializer()); + } + + @Test + public void testStringLiteral() throws Exception { + marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40)), new StringLiteralSerializer()); + } + + @Test + public void testLangLiteral() throws Exception { + marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40),"en"), new StringLiteralSerializer()); + } + + @Test + public void testTypeLiteral() throws Exception { + marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40),valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8))), new StringLiteralSerializer()); + } + + + @Test + public void testIntLiteral() throws Exception { + marshall((KiWiIntLiteral) valueFactory.createLiteral(rnd.nextInt()), new IntLiteralSerializer()); + } + + + @Test + public void testTriple() throws Exception { + KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiNode o = (KiWiNode) randomNode(); + KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o); + + marshall(t, new TripleSerializer()); + } + + @Test + public void testPrefixCompressedTriple() throws Exception { + KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiUriResource o = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o); + + marshall(t, new TripleSerializer()); + } + + + /** + * Run the given object through the marshaller using an in-memory stream. + * @param origin + * @param <T> + * @return + */ + private <T> void marshall(T origin, StreamSerializer<T> externalizer) throws IOException, ClassNotFoundException, InterruptedException { + log.info("- testing Java ObjectStream ..."); + ByteArrayOutputStream outBytesOS = new ByteArrayOutputStream(); + ObjectOutputStream outOS = new ObjectOutputStream(outBytesOS); + + outOS.writeObject(origin); + + outOS.close(); + + log.info(" object {}: serialized with {} bytes", origin, outBytesOS.size()); + + + log.info("- testing serializer directly ..."); + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + ObjectDataOutputStream out = new ObjectDataOutputStream(outBytes, simpleService); + + externalizer.write(out, origin); + out.close(); + + log.info(" object {}: serialized with {} bytes", origin, outBytes.size()); + + ByteArrayInputStream inBytes = new ByteArrayInputStream(outBytes.toByteArray()); + ObjectDataInputStream in = new ObjectDataInputStream(inBytes, simpleService); + + T destination1 = externalizer.read(in); + + Assert.assertEquals(origin,destination1); + + + + log.info("- testing serializer with Hazelcast serialization service ..."); + + + ByteArrayOutputStream outBytesFull = new ByteArrayOutputStream(); + ObjectDataOutputStream outFull = new ObjectDataOutputStream(outBytesFull, fullService); + + fullService.writeObject(outFull, origin); + outFull.close(); + + log.info(" object {}: serialized with {} bytes", origin, outBytesFull.size()); + + ByteArrayInputStream inBytesFull = new ByteArrayInputStream(outBytesFull.toByteArray()); + ObjectDataInputStream inFull = new ObjectDataInputStream(inBytesFull, fullService); + + T destination2 = (T) fullService.readObject(inFull); + + Assert.assertEquals(origin, destination2); + + } + + + /** + * Return a random RDF value, either a reused object (10% chance) or of any other kind. + * @return + */ + protected Value randomNode() { + Value object; + switch(rnd.nextInt(6)) { + case 0: object = valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + break; + case 1: object = valueFactory.createBNode(); + break; + case 2: object = valueFactory.createLiteral(RandomStringUtils.randomAscii(40)); + break; + case 3: object = valueFactory.createLiteral(rnd.nextInt()); + break; + case 4: object = valueFactory.createLiteral(rnd.nextDouble()); + break; + case 5: object = valueFactory.createLiteral(rnd.nextBoolean()); + break; + default: object = valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + break; + + } + return object; + } + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java index 5818e0b..85ed56b 100644 --- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java +++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java @@ -129,7 +129,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { .build(); cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); - log.info("initialised local cache manager"); + log.info("initialised Infinispan local cache manager"); } /** @@ -182,7 +182,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { .build(); cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); - log.info("initialised distributed cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); + log.info("initialised Infinispan distributed cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); } @@ -229,7 +229,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { .build(); cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); - log.info("initialised replicated cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); + log.info("initialised Infinispan replicated cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); } http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java deleted file mode 100644 index 981e595..0000000 --- a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.marmotta.kiwi.test; - -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import org.apache.marmotta.kiwi.caching.CacheManager; -import org.apache.marmotta.kiwi.config.KiWiConfiguration; -import org.apache.marmotta.kiwi.embedded.InfinispanEmbeddedCacheManagerFactory; -import org.apache.marmotta.kiwi.persistence.h2.H2Dialect; -import org.apache.marmotta.kiwi.sail.KiWiStore; -import org.junit.*; -import org.openrdf.model.URI; -import org.openrdf.repository.Repository; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Add file description here! - * - * @author Sebastian Schaffert ([email protected]) - */ -public class ClusterTest { - - private static Logger log = LoggerFactory.getLogger(ClusterTest.class); - - KiWiConfiguration config1, config2; - - KiWiStore store1, store2; - - Repository repository1, repository2; - - CacheManager cacheManager1, cacheManager2; - - @Before - public void setup() throws RepositoryException { - config1 = new KiWiConfiguration( - "default-H2", - "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", - "kiwi", "kiwi", - new H2Dialect()); - config1.setDatacenterId(1); - config1.setClustered(true); - config1.setCacheManager(InfinispanEmbeddedCacheManagerFactory.class.getName()); - - config2 = new KiWiConfiguration( - "default-H2", - "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", - "kiwi", "kiwi", - new H2Dialect()); - config2.setDatacenterId(2); - config2.setClustered(true); - config2.setCacheManager(InfinispanEmbeddedCacheManagerFactory.class.getName()); - - - - } - - public void setupCluster(int port1, int port2) throws RepositoryException { - config1.setClusterPort(port1); - config2.setClusterPort(port2); - - store1 = new KiWiStore(config1); - store2 = new KiWiStore(config2); - - repository1 = new SailRepository(store1); - repository2 = new SailRepository(store2); - - repository1.initialize(); - repository2.initialize(); - - cacheManager1 = store1.getPersistence().getCacheManager(); - cacheManager2 = store2.getPersistence().getCacheManager(); - } - - - @After - public void teardown() throws RepositoryException { - repository1.shutDown(); - repository2.shutDown(); - } - - - @Test - @Ignore - public void testClusteredCacheSync() throws InterruptedException, RepositoryException { - setupCluster(61222,61222); - - log.info("testing cache synchronization ..."); - - URI u = repository1.getValueFactory().createURI("http://localhost/test1"); - - - // give the cluster some time to performance asynchronous balancing - Thread.sleep(100); - - - log.debug("test if resource is in cache where it was created ..."); - URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u1); - Assert.assertEquals(u,u1); - - log.debug("test if resource has been synced to other cache in cluster ..."); - URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u2); - Assert.assertEquals(u,u2); - } - - @Test - @Ignore - public void testDisjointClusters() throws InterruptedException, RepositoryException { - setupCluster(61224,61225); - - log.info("testing caches on different ports ..."); - - URI u = repository1.getValueFactory().createURI("http://localhost/test1"); - - - // give the cluster some time to performance asynchronous balancing - Thread.sleep(100); - - log.debug("test if resource is in cache where it was created ..."); - URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u1); - Assert.assertEquals(u,u1); - - log.debug("test if resource has been synced to other cache in cluster ..."); - URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNull(u2); - } - - - private static Long createCacheKey(String svalue) { - Hasher hasher = Hashing.goodFastHash(64).newHasher(); - hasher.putString(svalue); - return hasher.hash().asLong(); - } - -} http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java new file mode 100644 index 0000000..4733aa9 --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.test.cluster; + +import org.apache.marmotta.kiwi.caching.CacheManager; +import org.apache.marmotta.kiwi.caching.CacheManagerType; +import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.persistence.h2.H2Dialect; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public abstract class BaseClusterTest { + + private static Logger log = LoggerFactory.getLogger(BaseClusterTest.class); + + private static int datacenterIds = 1; + + private static Repository repositorySync1, repositorySync2, repositoryAsync1, repositoryAsync2; + + private static CacheManager cacheManagerSync1, cacheManagerSync2, cacheManagerAsync1, cacheManagerAsync2; + + + @AfterClass + public static void teardown() throws RepositoryException { + repositorySync1.shutDown(); + repositorySync2.shutDown(); + repositoryAsync1.shutDown(); + repositoryAsync2.shutDown(); + } + + + @Test + public void testClusteredCacheSync() throws InterruptedException, RepositoryException { + + log.info("testing cache synchronization ..."); + + URI u = repositorySync1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManagerSync1.getUriCache().get("http://localhost/test1"); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManagerSync2.getUriCache().get("http://localhost/test1"); + + Assert.assertNotNull(u2); + Assert.assertEquals(u,u2); + } + + @Test + public void testDisjointClusters() throws InterruptedException, RepositoryException { + + log.info("testing caches on different ports ..."); + + URI u = repositoryAsync1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManagerAsync1.getUriCache().get("http://localhost/test1"); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManagerAsync2.getUriCache().get("http://localhost/test1"); + + Assert.assertNull(u2); + } + + + protected static class ClusterTestSupport { + + CacheManagerType type; + + protected ClusterTestSupport(CacheManagerType type) { + this.type = type; + } + + public void setup() { + try { + repositorySync1 = createConfiguration(61222); + repositorySync2 = createConfiguration(61222); + repositoryAsync1 = createConfiguration(61223); + repositoryAsync2 = createConfiguration(61224); + + cacheManagerSync1 = getCacheManager(repositorySync1); + cacheManagerSync2 = getCacheManager(repositorySync2); + cacheManagerAsync1 = getCacheManager(repositoryAsync1); + cacheManagerAsync2 = getCacheManager(repositoryAsync2); + + + } catch (RepositoryException ex) { + Assert.fail(ex.getMessage()); + } + } + + + private Repository createConfiguration(int port) throws RepositoryException { + KiWiConfiguration config = new KiWiConfiguration( + "default-H2", + "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", + "kiwi", "kiwi", + new H2Dialect()); + config.setDatacenterId(datacenterIds++); + config.setClustered(true); + config.setCacheManager(type); + config.setClusterPort(port); + + KiWiStore store = new KiWiStore(config); + + Repository repository = new SailRepository(store); + repository.initialize(); + + return repository; + } + + private static CacheManager getCacheManager(Repository repository) { + return ((KiWiStore)((SailRepository)repository).getSail()).getPersistence().getCacheManager(); + } + + } +}
