work on externalizer
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/1698c6cb Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/1698c6cb Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/1698c6cb Branch: refs/heads/develop Commit: 1698c6cb722eb5dc0db2a382282458fe7b12b321 Parents: 2e324d8 Author: Sebastian Schaffert <[email protected]> Authored: Mon Mar 3 17:27:19 2014 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Mon Mar 3 17:27:19 2014 +0100 ---------------------------------------------------------------------- .../InfinispanEmbeddedCacheManager.java | 310 ++++++++++--------- .../kiwi/externalizer/TripleExternalizer.java | 120 +++---- .../remote/InfinispanRemoteCacheManager.java | 114 +++++++ .../apache/marmotta/kiwi/test/ClusterTest.java | 160 ++++++++++ .../test/externalizer/ExternalizerTest.java | 27 +- .../apache/marmotta/kiwi/test/ClusterTest.java | 157 ---------- 6 files changed, 521 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java index 35c2018..5818e0b 100644 --- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java +++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java @@ -21,7 +21,6 @@ import org.apache.commons.io.IOUtils; import org.apache.marmotta.kiwi.caching.CacheManager; import org.apache.marmotta.kiwi.config.KiWiConfiguration; import org.apache.marmotta.kiwi.externalizer.*; -import org.apache.marmotta.kiwi.persistence.KiWiPersistence; import org.infinispan.Cache; import org.infinispan.commons.CacheException; import org.infinispan.commons.marshall.AdvancedExternalizer; @@ -44,9 +43,6 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED; -import static org.apache.marmotta.kiwi.config.CacheMode.REPLICATED; - /** * A class for managing the different caches that are used by the triple store. * <p/> @@ -68,19 +64,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { private EmbeddedCacheManager cacheManager; - private GlobalConfiguration globalConfiguration; - // default configuration: distributed cache, 100000 entries, 300 seconds expiration, 60 seconds idle private Configuration defaultConfiguration; - private boolean clustered, embedded; - - private KiWiConfiguration kiWiConfiguration; - - private KiWiPersistence persistence; + private KiWiConfiguration config; - - private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, loaderCache, registryCache; + private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, registryCache; /** @@ -90,120 +79,169 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { */ public InfinispanEmbeddedCacheManager(KiWiConfiguration config) { - this.clustered = config.isClustered(); - this.kiWiConfiguration = config; - - if(clustered && (config.getCacheMode() == DISTRIBUTED || config.getCacheMode() == REPLICATED)) { - try { - String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml")); - - jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress())); - jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort())); - - - globalConfiguration = new GlobalConfigurationBuilder() - .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) - .transport() - .defaultTransport() - .clusterName(config.getClusterName()) - .machineId("instance-" + config.getDatacenterId()) - .addProperty("configurationXml", jgroupsXml) - .globalJmxStatistics() - .jmxDomain("org.apache.marmotta.kiwi") - .allowDuplicateDomains(true) - .serialization() - .addAdvancedExternalizer(getExternalizers()) - .build(); - } catch (IOException ex) { - log.warn("error loading JGroups configuration from archive: {}", ex.getMessage()); - log.warn("some configuration options will not be available"); - - globalConfiguration = new GlobalConfigurationBuilder() - .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) - .transport() - .defaultTransport() - .clusterName(config.getClusterName()) - .machineId("instance-" + config.getDatacenterId()) - .addProperty("configurationFile", "jgroups-kiwi.xml") - .globalJmxStatistics() - .jmxDomain("org.apache.marmotta.kiwi") - .allowDuplicateDomains(true) - .serialization() - .addAdvancedExternalizer(getExternalizers()) - .build(); - } + this.config = config; - if(config.getCacheMode() == DISTRIBUTED) { - defaultConfiguration = new ConfigurationBuilder() - .clustering() - .cacheMode(CacheMode.DIST_ASYNC) - .async() - .asyncMarshalling() - .l1() - .lifespan(5, TimeUnit.MINUTES) - .hash() - .numOwners(2) - .numSegments(40) - .consistentHashFactory(new SyncConsistentHashFactory()) - .stateTransfer() - .fetchInMemoryState(false) - .eviction() - .strategy(EvictionStrategy.LIRS) - .maxEntries(100000) - .expiration() - .lifespan(30, TimeUnit.MINUTES) - .maxIdle(10, TimeUnit.MINUTES) - .build(); - } else { - defaultConfiguration = new ConfigurationBuilder() - .clustering() - .cacheMode(CacheMode.REPL_ASYNC) - .async() - .asyncMarshalling() - .stateTransfer() - .fetchInMemoryState(false) - .eviction() - .strategy(EvictionStrategy.LIRS) - .maxEntries(100000) - .expiration() - .lifespan(30, TimeUnit.MINUTES) - .maxIdle(10, TimeUnit.MINUTES) - .build(); + try { + switch (config.getCacheMode()) { + case DISTRIBUTED: + buildDistributedConfiguration(getExternalizers()); + break; + case REPLICATED: + buildReplicatedConfiguration(getExternalizers()); + break; + case LOCAL: + buildLocalConfiguration(); + break; } - } else { - globalConfiguration = new GlobalConfigurationBuilder() - .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) - .globalJmxStatistics() - .jmxDomain("org.apache.marmotta.kiwi") - .allowDuplicateDomains(true) - .build(); + } catch (IOException ex) { + log.warn("error while building cache cluster configuration, reverting to local cache"); + buildLocalConfiguration(); + } - defaultConfiguration = new ConfigurationBuilder() - .clustering() - .cacheMode(CacheMode.LOCAL) - .eviction() - .strategy(EvictionStrategy.LIRS) - .maxEntries(100000) - .expiration() + + } + + /** + * Build a local cache configuration. + * <p/> + * In local cache mode, the cache is not shared among the servers in a cluster. Each machine keeps a local cache. + * This allows quick startups and eliminates network traffic in the cluster, but subsequent requests to different + * cluster members cannot benefit from the cached data. + */ + protected void buildLocalConfiguration() { + GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder() + .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) + .globalJmxStatistics() + .jmxDomain("org.apache.marmotta.kiwi") + .allowDuplicateDomains(true) + .build(); + + defaultConfiguration = new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.LOCAL) + .eviction() + .strategy(EvictionStrategy.LIRS) + .maxEntries(100000) + .expiration() + .lifespan(5, TimeUnit.MINUTES) + .maxIdle(1, TimeUnit.MINUTES) + .build(); + cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); + + log.info("initialised local cache manager"); + } + + /** + * Build a distributed cache configuration. + * <p/> + * In distributed cache mode, the cluster forms a big hash table used as a cache. This allows to make efficient + * use of the large amount of memory available, but requires cache rebalancing and a lot of network transfers, + * especially in case cluster members are restarted often. + */ + protected void buildDistributedConfiguration(AdvancedExternalizer...externalizers) throws IOException { + String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml")); + + jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress())); + jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort())); + + + GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder() + .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) + .transport() + .defaultTransport() + .clusterName(config.getClusterName()) + .machineId("instance-" + config.getDatacenterId()) + .addProperty("configurationXml", jgroupsXml) + .globalJmxStatistics() + .jmxDomain("org.apache.marmotta.kiwi") + .allowDuplicateDomains(true) + .serialization() + .addAdvancedExternalizer(externalizers) + .build(); + + defaultConfiguration = new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.DIST_ASYNC) + .async() + .asyncMarshalling() + .l1() .lifespan(5, TimeUnit.MINUTES) - .maxIdle(1, TimeUnit.MINUTES) - .build(); + .hash() + .numOwners(2) + .numSegments(40) + .consistentHashFactory(new SyncConsistentHashFactory()) + .stateTransfer() + .fetchInMemoryState(false) + .eviction() + .strategy(EvictionStrategy.LIRS) + .maxEntries(100000) + .expiration() + .lifespan(30, TimeUnit.MINUTES) + .maxIdle(10, TimeUnit.MINUTES) + .build(); + cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); - } + log.info("initialised distributed cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); + } + /** + * Build a replicated cache configuration. + * <p/> + * In replicated cache mode, each node in the cluster has an identical copy of all cache data. This allows + * very efficient cache lookups and reduces the rebalancing effort, but requires more memory. + */ + protected void buildReplicatedConfiguration(AdvancedExternalizer...externalizers) throws IOException { + String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml")); + + jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress())); + jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort())); + + + GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder() + .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader()) + .transport() + .defaultTransport() + .clusterName(config.getClusterName()) + .machineId("instance-" + config.getDatacenterId()) + .addProperty("configurationXml", jgroupsXml) + .globalJmxStatistics() + .jmxDomain("org.apache.marmotta.kiwi") + .allowDuplicateDomains(true) + .serialization() + .addAdvancedExternalizer(externalizers) + .build(); + + defaultConfiguration = new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.REPL_ASYNC) + .async() + .asyncMarshalling() + .stateTransfer() + .fetchInMemoryState(false) + .eviction() + .strategy(EvictionStrategy.LIRS) + .maxEntries(100000) + .expiration() + .lifespan(30, TimeUnit.MINUTES) + .maxIdle(10, TimeUnit.MINUTES) + .build(); cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); - log.info("initialised Infinispan cache manager ({})", globalConfiguration.isClustered() ? "cluster name: "+globalConfiguration.transport().clusterName() : "single host"); + log.info("initialised replicated cache manager (cluster name: {})", globalConfiguration.transport().clusterName()); + } + - this.embedded = true; + protected boolean isClustered() { + return config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED || + config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.REPLICATED; } private AdvancedExternalizer[] getExternalizers() { return new AdvancedExternalizer[] { - new TripleExternalizer(persistence), + new TripleExternalizer(), new UriExternalizer(), new BNodeExternalizer(), new StringLiteralExternalizer(), @@ -249,7 +287,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { .clustering() .cacheMode(CacheMode.LOCAL) .eviction() - .maxEntries(kiWiConfiguration.getTripleCacheSize()) + .maxEntries(config.getTripleCacheSize()) .expiration() .lifespan(60, TimeUnit.MINUTES) .maxIdle(30, TimeUnit.MINUTES) @@ -272,7 +310,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { if(uriCache == null) { Configuration uriConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .eviction() - .maxEntries(kiWiConfiguration.getUriCacheSize()) + .maxEntries(config.getUriCacheSize()) .build(); cacheManager.defineConfiguration(URI_CACHE, uriConfiguration); @@ -292,7 +330,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { if(bnodeCache == null) { Configuration bnodeConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .eviction() - .maxEntries(kiWiConfiguration.getBNodeCacheSize()) + .maxEntries(config.getBNodeCacheSize()) .build(); cacheManager.defineConfiguration(BNODE_CACHE, bnodeConfiguration); @@ -312,7 +350,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { if(literalCache == null) { Configuration literalConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .eviction() - .maxEntries(kiWiConfiguration.getLiteralCacheSize()) + .maxEntries(config.getLiteralCacheSize()) .build(); cacheManager.defineConfiguration(LITERAL_CACHE, literalConfiguration); @@ -328,12 +366,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { */ public Cache getNamespaceUriCache() { if(nsUriCache == null) { - if(clustered) { + if(isClustered()) { Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .clustering() .cacheMode(CacheMode.REPL_ASYNC) .eviction() - .maxEntries(kiWiConfiguration.getNamespaceCacheSize()) + .maxEntries(config.getNamespaceCacheSize()) .expiration() .lifespan(1, TimeUnit.DAYS) .build(); @@ -341,7 +379,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { } else { Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .eviction() - .maxEntries(kiWiConfiguration.getNamespaceCacheSize()) + .maxEntries(config.getNamespaceCacheSize()) .expiration() .lifespan(1, TimeUnit.HOURS) .build(); @@ -359,12 +397,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { */ public Cache getNamespacePrefixCache() { if(nsPrefixCache == null) { - if(clustered) { + if(isClustered()) { Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .clustering() .cacheMode(CacheMode.REPL_ASYNC) .eviction() - .maxEntries(kiWiConfiguration.getNamespaceCacheSize()) + .maxEntries(config.getNamespaceCacheSize()) .expiration() .lifespan(1, TimeUnit.DAYS) .build(); @@ -373,7 +411,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { } else { Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration) .eviction() - .maxEntries(kiWiConfiguration.getNamespaceCacheSize()) + .maxEntries(config.getNamespaceCacheSize()) .expiration() .lifespan(1, TimeUnit.HOURS) .build(); @@ -395,7 +433,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { */ public Cache getRegistryCache() { if(registryCache == null) { - if(clustered) { + if(isClustered()) { Configuration registryConfiguration = new ConfigurationBuilder() .clustering() .cacheMode(CacheMode.REPL_SYNC) @@ -436,31 +474,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { } /** - * Return the Infinispan cache manager used by the caching infrastructure. - * - * @return - */ - public EmbeddedCacheManager getCacheManager() { - return cacheManager; - } - - /** - * Return the global cache manager configuration used by the caching infrastructure. - * @return - */ - public GlobalConfiguration getGlobalConfiguration() { - return globalConfiguration; - } - - /** - * Return the default cache configuration used by the caching infrastructure. - * @return - */ - public Configuration getDefaultConfiguration() { - return defaultConfiguration; - } - - /** * Clear all caches managed by this cache manager. */ public void clear() { @@ -479,7 +492,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { bnodeCache = null; nsPrefixCache = null; nsUriCache = null; - loaderCache = null; registryCache = null; } @@ -488,7 +500,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager { */ public void shutdown() { try { - if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) { + if(cacheManager.getStatus() == ComponentStatus.RUNNING) { log.warn("shutting down cache manager ..."); // if(cacheManager.getTransport() != null) { // log.info("... shutting down transport ..."); http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java index e4c584c..05e6933 100644 --- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java +++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java @@ -17,21 +17,17 @@ package org.apache.marmotta.kiwi.externalizer; +import org.apache.commons.lang3.StringUtils; import org.apache.marmotta.kiwi.model.rdf.KiWiNode; import org.apache.marmotta.kiwi.model.rdf.KiWiResource; import org.apache.marmotta.kiwi.model.rdf.KiWiTriple; import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource; -import org.apache.marmotta.kiwi.persistence.KiWiConnection; -import org.apache.marmotta.kiwi.persistence.KiWiPersistence; import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.commons.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.sql.SQLException; import java.util.Date; import java.util.Set; @@ -41,13 +37,10 @@ import java.util.Set; */ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> { - private static Logger log = LoggerFactory.getLogger(TripleExternalizer.class); - private KiWiPersistence persistence; + public static final int MODE_DEFAULT = 1; + public static final int MODE_PREFIX = 2; - public TripleExternalizer(KiWiPersistence persistence) { - this.persistence = persistence; - } @Override public Set<Class<? extends KiWiTriple>> getTypeClasses() { @@ -62,11 +55,39 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> { @Override public void writeObject(ObjectOutput output, KiWiTriple object) throws IOException { output.writeLong(object.getId()); - output.writeLong(object.getSubject().getId()); - output.writeLong(object.getPredicate().getId()); - output.writeLong(object.getObject().getId()); - output.writeLong(object.getContext() != null ? object.getContext().getId() : -1L); - output.writeLong(object.getCreator() != null ? object.getCreator().getId() : -1L); + + // in case subject and object are both uris we use a special prefix-compressed mode + if(object.getSubject().isUriResource() && object.getObject().isUriResource()) { + String sUri = object.getSubject().stringValue(); + String oUri = object.getObject().stringValue(); + + String prefix = StringUtils.getCommonPrefix(sUri,oUri); + + output.writeByte(MODE_PREFIX); + output.writeInt(prefix.length()); + output.writeChars(prefix); + + output.writeLong(object.getSubject().getId()); + output.writeInt(sUri.length() - prefix.length()); + output.writeChars(sUri.substring(prefix.length())); + output.writeLong(object.getSubject().getCreated().getTime()); + + output.writeObject(object.getPredicate()); + + output.writeLong(object.getObject().getId()); + output.writeInt(oUri.length() - prefix.length()); + output.writeChars(oUri.substring(prefix.length())); + output.writeLong(object.getObject().getCreated().getTime()); + } else { + output.writeByte(MODE_DEFAULT); + + output.writeObject(object.getSubject()); + output.writeObject(object.getPredicate()); + output.writeObject(object.getObject()); + } + + output.writeObject(object.getContext()); + output.writeObject(object.getCreator()); output.writeBoolean(object.isDeleted()); output.writeBoolean(object.isInferred()); output.writeBoolean(object.isNewTriple()); @@ -80,48 +101,33 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> { @Override public KiWiTriple readObject(ObjectInput input) throws IOException, ClassNotFoundException { - try { - KiWiConnection con = persistence.getConnection(); - try { - KiWiTriple result = new KiWiTriple(); - result.setId(input.readLong()); - - long[] nodeIds = new long[5]; - for(int i=0; i<5; i++) { - nodeIds[0] = input.readLong(); - } - KiWiNode[] nodes = con.loadNodesByIds(nodeIds); - - result.setSubject((KiWiResource) nodes[0]); - result.setPredicate((KiWiUriResource) nodes[1]); - result.setObject(nodes[2]); - - if(nodes[3] != null) { - result.setContext((KiWiResource) nodes[3]); - } - if(nodes[4] != null) { - result.setCreator((KiWiResource) nodes[4]); - } - - result.setDeleted(input.readBoolean()); - result.setInferred(input.readBoolean()); - result.setNewTriple(input.readBoolean()); - - result.setCreated(new Date(input.readLong())); - - long deletedAt = input.readLong(); - if(deletedAt > 0) { - result.setDeletedAt(new Date(deletedAt)); - } - - - return result; - } finally { - con.commit(); - con.close(); - } - } catch (SQLException ex) { - throw new IOException(ex); + + KiWiTriple result = new KiWiTriple(); + result.setId(input.readLong()); + + int mode = input.readInt(); + if(mode == MODE_PREFIX) { + String prefix = + } + + + result.setSubject((KiWiResource) input.readObject()); + result.setPredicate((KiWiUriResource) input.readObject()); + result.setObject((KiWiNode) input.readObject()); + result.setContext((KiWiResource) input.readObject()); + result.setCreator((KiWiResource) input.readObject()); + result.setDeleted(input.readBoolean()); + result.setInferred(input.readBoolean()); + result.setNewTriple(input.readBoolean()); + + result.setCreated(new Date(input.readLong())); + + long deletedAt = input.readLong(); + if(deletedAt > 0) { + result.setDeletedAt(new Date(deletedAt)); } + + + return result; } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java index c40feb8..6f5034a 100644 --- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java +++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java @@ -19,6 +19,9 @@ package org.apache.marmotta.kiwi.remote; import org.apache.marmotta.kiwi.caching.CacheManager; import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.model.rdf.*; + +import java.util.Map; /** * Implementation of an Infinispan cache manager with a remote (client-server) cache. @@ -34,7 +37,118 @@ public class InfinispanRemoteCacheManager implements CacheManager { } + /** + * Return the node id -> node cache from the cache manager. This cache is heavily used to lookup + * nodes when querying or loading triples and should therefore have a decent size (default 500.000 elements). + * + * @return an EHCache Cache instance containing the node id -> node mappings + */ + @Override + public Map<Long, KiWiNode> getNodeCache() { + return null; + } + + /** + * Return the triple id -> triple cache from the cache manager. This cache is used for speeding up the + * construction of query results. + * + * @return + */ + @Override + public Map<Long, KiWiTriple> getTripleCache() { + return null; + } + /** + * Return the uri -> KiWiUriResource cache from the cache manager. This cache is used when constructing new + * KiWiUriResources to avoid a database lookup. + * + * @return + */ + @Override + public Map<String, KiWiUriResource> getUriCache() { + return null; + } + /** + * Return the anonId -> KiWiAnonResource cache from the cache manager. This cache is used when constructing new + * KiWiAnonResources to avoid a database lookup. + * + * @return + */ + @Override + public Map<String, KiWiAnonResource> getBNodeCache() { + return null; + } + /** + * Return the literal cache key -> KiWiLiteral cache from the cache manager. This cache is used when constructing new + * KiWiLiterals to avoid a database lookup. + * + * @return + * @see org.apache.marmotta.commons.sesame.model.LiteralCommons#createCacheKey(String, java.util.Locale, String) + */ + @Override + public Map<String, KiWiLiteral> getLiteralCache() { + return null; + } + + /** + * Return the URI -> namespace cache from the cache manager. Used for looking up namespaces + * + * @return + */ + @Override + public Map<String, KiWiNamespace> getNamespaceUriCache() { + return null; + } + + /** + * Return the prefix -> namespace cache from the cache manager. Used for looking up namespaces + * + * @return + */ + @Override + public Map<String, KiWiNamespace> getNamespacePrefixCache() { + return null; + } + + /** + * Create and return the cache used by the CacheTripleRegistry. This is an unlimited synchronous replicated + * cache and should be used with care. + * + * @return + */ + @Override + public Map<Long, Long> getRegistryCache() { + return null; + } + + /** + * Get the cache with the given name from the cache manager. Can be used to request additional + * caches from the cache manager that are not covered by explicit methods. + * + * @param name + * @return + */ + @Override + public Map getCacheByName(String name) { + return null; + } + + /** + * Clear all caches managed by this cache manager. + */ + @Override + public void clear() { + + } + + /** + * Shutdown this cache manager instance. Will shutdown the underlying EHCache cache manager. + */ + @Override + public void shutdown() { + + } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java new file mode 100644 index 0000000..0b6823c --- /dev/null +++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.test; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.marmotta.kiwi.caching.CacheManager; +import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.embedded.InfinispanEmbeddedCacheManagerFactory; +import org.apache.marmotta.kiwi.persistence.h2.H2Dialect; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.junit.*; +import org.openrdf.model.URI; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class ClusterTest { + + private static Logger log = LoggerFactory.getLogger(ClusterTest.class); + + KiWiConfiguration config1, config2; + + KiWiStore store1, store2; + + Repository repository1, repository2; + + CacheManager cacheManager1, cacheManager2; + + @Before + public void setup() throws RepositoryException { + config1 = new KiWiConfiguration( + "default-H2", + "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", + "kiwi", "kiwi", + new H2Dialect()); + config1.setDatacenterId(1); + config1.setClustered(true); + config1.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName()); + + config2 = new KiWiConfiguration( + "default-H2", + "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", + "kiwi", "kiwi", + new H2Dialect()); + config2.setDatacenterId(2); + config2.setClustered(true); + config2.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName()); + + + + } + + public void setupCluster(int port1, int port2) throws RepositoryException { + config1.setClusterPort(port1); + config2.setClusterPort(port2); + + store1 = new KiWiStore(config1); + store2 = new KiWiStore(config2); + + repository1 = new SailRepository(store1); + repository2 = new SailRepository(store2); + + repository1.initialize(); + repository2.initialize(); + + cacheManager1 = store1.getPersistence().getCacheManager(); + cacheManager2 = store2.getPersistence().getCacheManager(); + } + + + @After + public void teardown() throws RepositoryException { + repository1.shutDown(); + repository2.shutDown(); + } + + + @Test + @Ignore + public void testClusteredCacheSync() throws InterruptedException, RepositoryException { + setupCluster(61222,61222); + + log.info("testing cache synchronization ..."); + + URI u = repository1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u2); + Assert.assertEquals(u,u2); + } + + @Test + @Ignore + public void testDisjointClusters() throws InterruptedException, RepositoryException { + setupCluster(61224,61225); + + log.info("testing caches on different ports ..."); + + URI u = repository1.getValueFactory().createURI("http://localhost/test1"); + + + // give the cluster some time to performance asynchronous balancing + Thread.sleep(100); + + log.debug("test if resource is in cache where it was created ..."); + URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNotNull(u1); + Assert.assertEquals(u,u1); + + log.debug("test if resource has been synced to other cache in cluster ..."); + URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); + + Assert.assertNull(u2); + } + + + private static Long createCacheKey(String svalue) { + Hasher hasher = Hashing.goodFastHash(64).newHasher(); + hasher.putString(svalue); + return hasher.hash().asLong(); + } + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java index fe7b701..9f52813 100644 --- a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java +++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java @@ -18,10 +18,8 @@ package org.apache.marmotta.kiwi.test.externalizer; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource; -import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource; +import org.apache.marmotta.kiwi.externalizer.*; +import org.apache.marmotta.kiwi.model.rdf.*; import org.apache.marmotta.kiwi.test.TestValueFactory; import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.commons.marshall.StreamingMarshaller; @@ -123,6 +121,16 @@ public class ExternalizerTest { } + @Test + public void testTriple() throws Exception { + KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)); + KiWiNode o = (KiWiNode) randomNode(); + KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o); + + marshall(t, new TripleExternalizer()); + } + /** * Run the given object through the marshaller using an in-memory stream. * @param origin @@ -130,6 +138,17 @@ public class ExternalizerTest { * @return */ private <T> void marshall(T origin, AdvancedExternalizer<T> externalizer) throws IOException, ClassNotFoundException, InterruptedException { + log.info("- testing Java ObjectStream ..."); + ByteArrayOutputStream outBytesOS = new ByteArrayOutputStream(); + ObjectOutputStream outOS = new ObjectOutputStream(outBytesOS); + + outOS.writeObject(origin); + + outOS.close(); + + log.info(" object {}: serialized with {} bytes", origin, outBytesOS.size()); + + log.info("- testing externalizer directly ..."); ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(outBytes); http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java deleted file mode 100644 index 5f4d719..0000000 --- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.marmotta.kiwi.test; - -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import org.apache.marmotta.kiwi.caching.KiWiCacheManager; -import org.apache.marmotta.kiwi.config.KiWiConfiguration; -import org.apache.marmotta.kiwi.persistence.h2.H2Dialect; -import org.apache.marmotta.kiwi.sail.KiWiStore; -import org.junit.*; -import org.openrdf.model.URI; -import org.openrdf.repository.Repository; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Add file description here! - * - * @author Sebastian Schaffert ([email protected]) - */ -public class ClusterTest { - - private static Logger log = LoggerFactory.getLogger(ClusterTest.class); - - KiWiConfiguration config1, config2; - - KiWiStore store1, store2; - - Repository repository1, repository2; - - KiWiCacheManager cacheManager1, cacheManager2; - - @Before - public void setup() throws RepositoryException { - config1 = new KiWiConfiguration( - "default-H2", - "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", - "kiwi", "kiwi", - new H2Dialect()); - config1.setDatacenterId(1); - config1.setClustered(true); - - config2 = new KiWiConfiguration( - "default-H2", - "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1", - "kiwi", "kiwi", - new H2Dialect()); - config2.setDatacenterId(2); - config2.setClustered(true); - - - - } - - public void setupCluster(int port1, int port2) throws RepositoryException { - config1.setClusterPort(port1); - config2.setClusterPort(port2); - - store1 = new KiWiStore(config1); - store2 = new KiWiStore(config2); - - repository1 = new SailRepository(store1); - repository2 = new SailRepository(store2); - - repository1.initialize(); - repository2.initialize(); - - cacheManager1 = store1.getPersistence().getCacheManager(); - cacheManager2 = store2.getPersistence().getCacheManager(); - } - - - @After - public void teardown() throws RepositoryException { - repository1.shutDown(); - repository2.shutDown(); - } - - - @Test - @Ignore - public void testClusteredCacheSync() throws InterruptedException, RepositoryException { - setupCluster(61222,61222); - - log.info("testing cache synchronization ..."); - - URI u = repository1.getValueFactory().createURI("http://localhost/test1"); - - - // give the cluster some time to performance asynchronous balancing - Thread.sleep(100); - - - log.debug("test if resource is in cache where it was created ..."); - URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u1); - Assert.assertEquals(u,u1); - - log.debug("test if resource has been synced to other cache in cluster ..."); - URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u2); - Assert.assertEquals(u,u2); - } - - @Test - @Ignore - public void testDisjointClusters() throws InterruptedException, RepositoryException { - setupCluster(61224,61225); - - log.info("testing caches on different ports ..."); - - URI u = repository1.getValueFactory().createURI("http://localhost/test1"); - - - // give the cluster some time to performance asynchronous balancing - Thread.sleep(100); - - log.debug("test if resource is in cache where it was created ..."); - URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNotNull(u1); - Assert.assertEquals(u,u1); - - log.debug("test if resource has been synced to other cache in cluster ..."); - URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1")); - - Assert.assertNull(u2); - } - - - private static Long createCacheKey(String svalue) { - Hasher hasher = Hashing.goodFastHash(64).newHasher(); - hasher.putString(svalue); - return hasher.hash().asLong(); - } - -}
