Updated Branches: refs/heads/develop 5efe5b0ad -> dbb13f657
started working on a separate kiwi-caching module with support for query caching Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/7befade4 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/7befade4 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/7befade4 Branch: refs/heads/develop Commit: 7befade45c572d65b449850b121f022ef2a150de Parents: 5efe5b0 Author: Sebastian Schaffert <[email protected]> Authored: Wed Dec 18 20:58:20 2013 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Wed Dec 18 20:58:20 2013 +0100 ---------------------------------------------------------------------- libraries/kiwi/kiwi-caching/pom.xml | 172 ++++++++++ .../config/KiWiQueryCacheConfiguration.java | 69 ++++ .../caching/iteration/BufferingIteration.java | 118 +++++++ .../caching/iteration/CachingIteration.java | 130 ++++++++ .../kiwi/caching/sail/KiWiCachingSail.java | 117 +++++++ .../caching/sail/KiWiCachingSailConnection.java | 315 +++++++++++++++++++ .../KiWiCachingRepositoryConnectionTest.java | 57 ++++ .../caching/test/KiWiCachingRepositoryTest.java | 61 ++++ .../marmotta/kiwi/caching/KiWiCacheManager.java | 28 +- libraries/kiwi/pom.xml | 1 + 10 files changed, 1066 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/pom.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/pom.xml b/libraries/kiwi/kiwi-caching/pom.xml new file mode 100644 index 0000000..2ab4417 --- /dev/null +++ b/libraries/kiwi/kiwi-caching/pom.xml @@ -0,0 +1,172 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.marmotta</groupId> + <artifactId>kiwi-parent</artifactId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + + <artifactId>kiwi-caching</artifactId> + <packaging>jar</packaging> + + <name>KiWi Triplestore: Caching</name> + <description> + Provides transparent query caching on top of the KiWi triplestore. Queries are cached using the Infinispan + distributed caching system using the EmbeddedCacheManager used by the triplestore itself. + </description> + + <dependencies> + <dependency> + <groupId>org.apache.marmotta</groupId> + <artifactId>kiwi-triplestore</artifactId> + </dependency> + <dependency> + <groupId>org.infinispan</groupId> + <artifactId>infinispan-core</artifactId> + </dependency> + + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </dependency> + + <!-- Sesame dependencies --> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-model</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-sail</artifactId> + </dependency> + + <!-- Utilities --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.marmotta</groupId> + <artifactId>marmotta-commons</artifactId> + </dependency> + <dependency> + <groupId>org.apache.marmotta</groupId> + <artifactId>sesame-filter</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + + <!-- Testing --> + <dependency> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.marmotta</groupId> + <artifactId>kiwi-triplestore</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>hamcrest-core</artifactId> + <groupId>org.hamcrest</groupId> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>hamcrest-library</artifactId> + <groupId>org.hamcrest</groupId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <scope>test</scope> + <optional>true</optional> <!-- GPL licensed, no dependency --> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-rdfxml</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-sparql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-store-testsuite</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/config/KiWiQueryCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/config/KiWiQueryCacheConfiguration.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/config/KiWiQueryCacheConfiguration.java new file mode 100644 index 0000000..518e82e --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/config/KiWiQueryCacheConfiguration.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.config; + +/** + * Configuration object for all query caching options that are configurable. + * + * @author Sebastian Schaffert ([email protected]) + */ +public class KiWiQueryCacheConfiguration { + + + /** + * Maximum size of results to cache. Results bigger than this value will not be cached. + */ + private int maxEntrySize = 150; + + + /** + * Maximum number of entries to keep in the cache. + */ + private int maxCacheSize = 100000; + + public KiWiQueryCacheConfiguration() { + } + + /** + * Maximum size of results to cache. Results bigger than this value will not be cached. + */ + public int getMaxEntrySize() { + return maxEntrySize; + } + + /** + * Maximum size of results to cache. Results bigger than this value will not be cached. + */ + public void setMaxEntrySize(int maxEntrySize) { + this.maxEntrySize = maxEntrySize; + } + + /** + * Maximum number of entries to keep in the cache. + */ + public int getMaxCacheSize() { + return maxCacheSize; + } + + /** + * Maximum number of entries to keep in the cache. + */ + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/BufferingIteration.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/BufferingIteration.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/BufferingIteration.java new file mode 100644 index 0000000..9a87df4 --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/BufferingIteration.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.iteration; + +import info.aduna.iteration.CloseableIteration; +import info.aduna.iteration.CloseableIterationBase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class BufferingIteration<E,X extends Exception> extends CloseableIterationBase<E,X> implements CloseableIteration<E,X> { + + private int limit = 150; + + private List<E> buffer; + + private CloseableIteration<? extends E,X> wrapped; + + public BufferingIteration(int limit, CloseableIteration<? extends E, X> wrapped) { + this.limit = limit; + + this.wrapped = wrapped; + this.buffer = new ArrayList<>(limit); + } + + /** + * Returns <tt>true</tt> if the iteration has more elements. (In other + * words, returns <tt>true</tt> if {@link #next} would return an element + * rather than throwing a <tt>NoSuchElementException</tt>.) + * + * @return <tt>true</tt> if the iteration has more elements. + * @throws X + */ + @Override + public boolean hasNext() throws X { + return wrapped.hasNext(); + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration. + */ + @Override + public E next() throws X { + E n = wrapped.next(); + + if(buffer != null && buffer.size() < limit) { + buffer.add(n); + } else { + buffer = null; + } + + return n; + } + + /** + * Removes from the underlying collection the last element returned by the + * iteration (optional operation). This method can be called only once per + * call to next. + * + * @throws UnsupportedOperationException if the remove operation is not supported by this Iteration. + * @throws IllegalStateException If the Iteration has been closed, or if <tt>next()</tt> has not + * yet been called, or <tt>remove()</tt> has already been called + * after the last call to <tt>next()</tt>. + */ + @Override + public void remove() throws X { + wrapped.remove(); + buffer.remove(buffer.size() - 1); + } + + /** + * Called by {@link #close} when it is called for the first time. This method + * is only called once on each iteration. By default, this method does + * nothing. + * + * @throws X + */ + @Override + protected void handleClose() throws X { + + super.handleClose(); + } + + /** + * Return the buffer contents (or null if the buffer has reached its limit) + * + * @return + */ + public List<E> getBuffer() { + return buffer; + } + + public int getLimit() { + return limit; + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/CachingIteration.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/CachingIteration.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/CachingIteration.java new file mode 100644 index 0000000..067e00b --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/iteration/CachingIteration.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.iteration; + +import info.aduna.iteration.CloseableIteration; +import info.aduna.iteration.CloseableIterationBase; +import info.aduna.iteration.CloseableIteratorIteration; + +import java.util.List; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class CachingIteration<E,X extends Exception> extends CloseableIterationBase<E,X> implements CloseableIteration<E,X> { + + private CloseableIteration<E,X> wrapped; + + private CacheFunction<E> cacheFunction; + + public CachingIteration(CacheFunction<E> cacheFunction,BufferingIterationProducer<E, X> producer) throws X { + + this.cacheFunction = cacheFunction; + + List<E> cached = cacheFunction.getResult(); + if(cached != null) { + this.wrapped = new CloseableIteratorIteration<>(cached.iterator()); + } else { + this.wrapped = producer.getIteration(); + } + } + + /** + * Returns <tt>true</tt> if the iteration has more elements. (In other + * words, returns <tt>true</tt> if {@link #next} would return an element + * rather than throwing a <tt>NoSuchElementException</tt>.) + * + * @return <tt>true</tt> if the iteration has more elements. + * @throws X + */ + @Override + public boolean hasNext() throws X { + return wrapped.hasNext(); + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration. + */ + @Override + public E next() throws X { + return wrapped.next(); + } + + /** + * Removes from the underlying collection the last element returned by the + * iteration (optional operation). This method can be called only once per + * call to next. + * + * @throws UnsupportedOperationException if the remove operation is not supported by this Iteration. + * @throws IllegalStateException If the Iteration has been closed, or if <tt>next()</tt> has not + * yet been called, or <tt>remove()</tt> has already been called + * after the last call to <tt>next()</tt>. + */ + @Override + public void remove() throws X { + wrapped.remove(); + } + + /** + * Called by {@link #close} when it is called for the first time. This method + * is only called once on each iteration. By default, this method does + * nothing. + * + * @throws X + */ + @Override + protected void handleClose() throws X { + if(wrapped instanceof BufferingIteration && ((BufferingIteration) wrapped).getBuffer() != null) { + cacheFunction.cacheResult(((BufferingIteration) wrapped).getBuffer()); + } + + super.handleClose(); + } + + + public static interface BufferingIterationProducer<E,X extends Exception> { + + /** + * This method should lazily create the iteration wrapped by this caching iteration. + * @return + */ + public BufferingIteration<E,X> getIteration() throws X; + + } + + + public static interface CacheFunction<E> { + + /** + * Return the cached result for this iteration (or null in case there is no cached result) + */ + public List<E> getResult(); + + /** + * Cache the result of this iteration. + * + * @param buffer + */ + public void cacheResult(List<E> buffer); + + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java new file mode 100644 index 0000000..8e4f34a --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.sail; + +import org.apache.marmotta.kiwi.caching.config.KiWiQueryCacheConfiguration; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.infinispan.Cache; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.transaction.lookup.GenericTransactionManagerLookup; +import org.openrdf.sail.NotifyingSail; +import org.openrdf.sail.NotifyingSailConnection; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; +import org.openrdf.sail.helpers.NotifyingSailWrapper; +import org.openrdf.sail.helpers.SailWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * A sail wrapper for KiWi stores that introduces transparent query caching using Infinispan distributed caches. + * + * @author Sebastian Schaffert ([email protected]) + */ +public class KiWiCachingSail extends NotifyingSailWrapper { + + public static final String QUERY_CACHE = "query-cache"; + + private static Logger log = LoggerFactory.getLogger(KiWiCachingSail.class); + + private KiWiStore parent; + + + private EmbeddedCacheManager cacheManager; + + private KiWiQueryCacheConfiguration configuration; + + /** + * Creates a new SailWrapper that wraps the supplied Sail. + * + * @param baseSail + */ + public KiWiCachingSail(NotifyingSail baseSail, KiWiQueryCacheConfiguration configuration) { + super(baseSail); + + this.parent = getRootSail(baseSail); + this.cacheManager = parent.getPersistence().getCacheManager().getCacheManager(); + this.configuration = configuration; + } + + + @Override + public NotifyingSailConnection getConnection() throws SailException { + return new KiWiCachingSailConnection(super.getConnection(), getQueryCache(), configuration.getMaxEntrySize()); + } + + + + /** + * Return the query key -> query result cache from the cache manager. This cache is used for speeding up the + * listing of query results. + * + * @return + */ + private Cache getQueryCache() { + if(!cacheManager.cacheExists(QUERY_CACHE)) { + Configuration tripleConfiguration = new ConfigurationBuilder().read(cacheManager.getDefaultCacheConfiguration()) + .transaction() + .transactionMode(TransactionMode.TRANSACTIONAL) + .transactionManagerLookup(new GenericTransactionManagerLookup()) + .eviction() + . maxEntries(configuration.getMaxCacheSize()) + .expiration() + .lifespan(60, TimeUnit.MINUTES) + .maxIdle(30, TimeUnit.MINUTES) + .build(); + cacheManager.defineConfiguration(QUERY_CACHE, tripleConfiguration); + } + return cacheManager.getCache(QUERY_CACHE); + } + + + /** + * Get the root sail in the wrapped sail stack + * @param sail + * @return + */ + private KiWiStore getRootSail(Sail sail) { + if(sail instanceof KiWiStore) { + return (KiWiStore) sail; + } else if(sail instanceof SailWrapper) { + return getRootSail(((SailWrapper) sail).getBaseSail()); + } else { + throw new IllegalArgumentException("root sail is not a KiWiStore or could not be found"); + } + } + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java new file mode 100644 index 0000000..25af151 --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.sail; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import info.aduna.iteration.CloseableIteration; +import info.aduna.iteration.Iteration; +import info.aduna.iteration.UnionIteration; +import org.apache.marmotta.commons.sesame.tripletable.IntArray; +import org.apache.marmotta.kiwi.caching.iteration.BufferingIteration; +import org.apache.marmotta.kiwi.caching.iteration.CachingIteration; +import org.apache.marmotta.kiwi.model.rdf.KiWiTriple; +import org.infinispan.Cache; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.sail.NotifyingSailConnection; +import org.openrdf.sail.SailException; +import org.openrdf.sail.UpdateContext; +import org.openrdf.sail.helpers.NotifyingSailConnectionWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.*; +import java.nio.IntBuffer; +import java.util.*; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { + + private static Logger log = LoggerFactory.getLogger(KiWiCachingSailConnection.class); + + private Cache<IntArray,List<Statement>> queryCache; + + // a dummy default context to work around the double meaning of the null value + private final static URI defaultContext = new URIImpl("http://marmotta.apache.org/contexts/default"); + + private int limit = 150; + + public KiWiCachingSailConnection(NotifyingSailConnection wrappedCon, Cache<IntArray, List<Statement>> queryCache, int limit) { + super(wrappedCon); + + this.queryCache = queryCache; + this.limit = limit; + + try { + queryCache.getAdvancedCache().getTransactionManager().begin(); + } catch (NotSupportedException | SystemException e) { + log.error("error starting cache transaction: ",e); + } + } + + + @Override + public CloseableIteration<? extends Statement, SailException> getStatements(final Resource subj, final URI pred, final Value obj, final boolean includeInferred, final Resource... contexts) throws SailException { + List<Iteration<? extends Statement, SailException>> cResults = new ArrayList<>(contexts.length + 1); + for(final Resource context : resolveContexts(contexts)) { + cResults.add(new CachingIteration<>( + new CachingIteration.CacheFunction<Statement>() { + @Override + public List<Statement> getResult() { + return listTriples(subj,pred,obj,context, includeInferred); + } + + @Override + public void cacheResult(List<Statement> buffer) { + cacheTriples(subj,pred,obj,context,includeInferred,buffer); + } + }, + new CachingIteration.BufferingIterationProducer<Statement, SailException>() { + @Override + public BufferingIteration<Statement, SailException> getIteration() throws SailException { + return new BufferingIteration<>(limit, KiWiCachingSailConnection.super.getStatements(subj, pred, obj, includeInferred, contexts)); + } + } + )); + } + + return new UnionIteration<Statement, SailException>(cResults); + + } + + @Override + public void addStatement(Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { + tripleUpdated(subj, pred, obj, contexts); + + super.addStatement(subj, pred, obj, contexts); + } + + @Override + public void removeStatements(Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { + tripleUpdated(subj, pred, obj, contexts); + + super.removeStatements(subj, pred, obj, contexts); + } + + @Override + public void addStatement(UpdateContext modify, Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { + tripleUpdated(subj, pred, obj, contexts); + + super.addStatement(modify, subj, pred, obj, contexts); + } + + @Override + public void removeStatement(UpdateContext modify, Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { + tripleUpdated(subj, pred, obj, contexts); + + super.removeStatement(modify, subj, pred, obj, contexts); + } + + @Override + public void commit() throws SailException { + try { + queryCache.getAdvancedCache().getTransactionManager().commit(); + } catch (RollbackException | HeuristicMixedException | HeuristicRollbackException | SystemException e) { + log.error("error committing cache transaction: ",e); + } + + super.commit(); + } + + @Override + public void rollback() throws SailException { + try { + queryCache.getAdvancedCache().getTransactionManager().rollback(); + } catch (SystemException e) { + log.error("error rolling back cache transaction: ",e); + } + + super.rollback(); + } + + + private Set<Resource> resolveContexts(Resource... contexts) { + if(contexts.length == 0) { + return Collections.singleton((Resource)defaultContext); + } else { + return Sets.newHashSet(contexts); + } + } + + /** + * Look up a triple query in the query cache. Returns the result set if the query is found in the cache, returns + * null if the query is not found. + * + * @param subject the subject of the triples to list or null for wildcard + * @param property the property of the triples to list or null for wildcard + * @param object the object of the triples to list or null for wildcard + * @param context the context/knowledge space of the triples to list or null for all spaces + * @param inferred if true, inferred triples are included in the result; if false not + * @return the result set if the query is found in the cache, returns null if the query is not found + */ + @SuppressWarnings("unchecked") + private List<Statement> listTriples(Resource subject, URI property, Value object, Resource context, boolean inferred) { + IntArray key = createCacheKey(subject,property,object,context,inferred); + if(queryCache.get(key) != null) return queryCache.get(key); + else + return null; + } + + + /** + * Cache the result of a triple query in the query cache. + * + * @param subject the subject of the triples to list or null for wildcard + * @param property the property of the triples to list or null for wildcard + * @param object the object of the triples to list or null for wildcard + * @param context the context/knowledge space of the triples to list or null for all spaces + * @param inferred if true, inferred triples are included in the result; if false not + * @param result the result of the triple query to cache + */ + private void cacheTriples(Resource subject, URI property, Value object, Resource context, boolean inferred, List<Statement> result) { + + // cache the query result + IntArray key = createCacheKey(subject,property,object,context,inferred); + queryCache.put(key,result); + + // cache the nodes of the triples and the triples themselves + Set<Value> nodes = new HashSet<Value>(); + for(Statement stmt : result) { + if(stmt instanceof KiWiTriple) { + KiWiTriple triple = (KiWiTriple)stmt; + Collections.addAll(nodes, new Value[]{triple.getSubject(), triple.getObject(), triple.getPredicate(), triple.getContext()}); + queryCache.put(createCacheKey(triple.getSubject(),triple.getPredicate(),triple.getObject(),triple.getContext(),triple.isInferred()), ImmutableList.of(stmt)); + } + } + + // special optimisation: when only the subject (and optionally context) is given, we also fill the caches for + // all property values + if(subject != null && property == null && object == null) { + HashMap<URI,List<Statement>> properties = new HashMap<>(); + for(Statement triple : result) { + List<Statement> values = properties.get(triple.getPredicate()); + if(values == null) { + values = new LinkedList<>(); + properties.put(triple.getPredicate(),values); + } + values.add(triple); + } + for(Map.Entry<URI,List<Statement>> entry : properties.entrySet()) { + IntArray key2 = createCacheKey(subject,entry.getKey(),null,context,inferred); + queryCache.put(key2,entry.getValue()); + } + } + + + } + + + + + /** + * Clear all contents of the query cache. + */ + private void clearAll() { + queryCache.clear(); + } + + + /** + * Notify the cache that the triple passed as argument has been updated and that all cache entries affected by + * the triple update need to be cleared. + * + */ + private void tripleUpdated(Resource subject, URI predicate, Value object, Resource... contexts) { + queryCache.remove(createCacheKey(null,null,null,null,false)); + queryCache.remove(createCacheKey(null,null,null,null,true)); + + // remove all possible combinations of this triple as they may appear in the cache + queryCache.remove(createCacheKey(subject,null,null,null,false)); + queryCache.remove(createCacheKey(subject,null,null,null,true)); + queryCache.remove(createCacheKey(null,predicate,null,null,false)); + queryCache.remove(createCacheKey(null,predicate,null,null,true)); + queryCache.remove(createCacheKey(null,null,object,null,false)); + queryCache.remove(createCacheKey(null,null,object,null,true)); + + queryCache.remove(createCacheKey(subject,predicate,null,null,false)); + queryCache.remove(createCacheKey(subject,predicate,null,null,true)); + queryCache.remove(createCacheKey(subject,null,object,null,false)); + queryCache.remove(createCacheKey(subject,null,object,null,true)); + queryCache.remove(createCacheKey(null,predicate,object,null,false)); + queryCache.remove(createCacheKey(null,predicate,object,null,true)); + + + queryCache.remove(createCacheKey(subject,predicate,object,null,false)); + queryCache.remove(createCacheKey(subject,predicate,object,null,true)); + + for(Resource context : contexts) { + queryCache.remove(createCacheKey(null,null,null,context,false)); + queryCache.remove(createCacheKey(null,null,null,context,true)); + queryCache.remove(createCacheKey(subject,null,null,context,false)); + queryCache.remove(createCacheKey(subject,null,null,context,true)); + queryCache.remove(createCacheKey(null,predicate,null,context,false)); + queryCache.remove(createCacheKey(null,predicate,null,context,true)); + queryCache.remove(createCacheKey(null,null,object,context,false)); + queryCache.remove(createCacheKey(null,null,object,context,true)); + + queryCache.remove(createCacheKey(subject,predicate,null,context,false)); + queryCache.remove(createCacheKey(subject,predicate,null,context,true)); + queryCache.remove(createCacheKey(subject,null,object,context,false)); + queryCache.remove(createCacheKey(subject,null,object,context,true)); + queryCache.remove(createCacheKey(null,predicate,object,context,false)); + queryCache.remove(createCacheKey(null,predicate,object,context,true)); + + queryCache.remove(createCacheKey(subject,predicate,object,context,false)); + queryCache.remove(createCacheKey(subject,predicate,object,context,true)); + } + } + + + private static IntArray createCacheKey(Resource subject, URI property, Value object, Resource context, boolean inferred){ + + // the cache key is generated by appending the bytes of the hashcodes of subject, property, object, context and inferred and + // storing them as a BigInteger; generating the cache key should thus be very efficient + + int s = subject != null ? subject.hashCode() : Integer.MIN_VALUE; + int p = property != null ? property.hashCode() : Integer.MIN_VALUE; + int o = object != null ? object.hashCode() : Integer.MIN_VALUE; + int c = context != null ? context.hashCode() : Integer.MIN_VALUE; + + IntBuffer bb = IntBuffer.allocate(5); + bb.put(s); + bb.put(p); + bb.put(o); + bb.put(c); + bb.put( (byte) (inferred ? 1 : 0) ); + + return new IntArray(bb.array()); + + } + + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryConnectionTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryConnectionTest.java b/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryConnectionTest.java new file mode 100644 index 0000000..aaa7d3c --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryConnectionTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.test; + +import org.apache.marmotta.kiwi.caching.config.KiWiQueryCacheConfiguration; +import org.apache.marmotta.kiwi.caching.sail.KiWiCachingSail; +import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner; +import org.junit.runner.RunWith; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnectionTest; +import org.openrdf.repository.sail.SailRepository; + +/** + * Run the {@link RepositoryConnectionTest}s. + * @author Jakob Frank <[email protected]> + * + */ +@RunWith(KiWiDatabaseRunner.class) +public class KiWiCachingRepositoryConnectionTest extends RepositoryConnectionTest { + + private final KiWiConfiguration config; + + public KiWiCachingRepositoryConnectionTest(KiWiConfiguration config) { + this.config = config; + } + + /* (non-Javadoc) + * @see org.openrdf.repository.RepositoryConnectionTest#createRepository() + */ + @Override + protected Repository createRepository() throws Exception { + config.setDefaultContext(null); + KiWiStore store = new KiWiStore(config); + store.setDropTablesOnShutdown(true); + + KiWiCachingSail cache = new KiWiCachingSail(store, new KiWiQueryCacheConfiguration()); + return new SailRepository(cache); + } + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryTest.java b/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryTest.java new file mode 100644 index 0000000..9b13279 --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/test/java/org/apache/marmotta/kiwi/caching/test/KiWiCachingRepositoryTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.test; + +import org.apache.marmotta.kiwi.caching.config.KiWiQueryCacheConfiguration; +import org.apache.marmotta.kiwi.caching.sail.KiWiCachingSail; +import org.apache.marmotta.kiwi.config.KiWiConfiguration; +import org.apache.marmotta.kiwi.sail.KiWiStore; +import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner; +import org.junit.runner.RunWith; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryTest; +import org.openrdf.repository.sail.SailRepository; + +/** + * Run the {@link RepositoryTest}s. + * @author Jakob Frank <[email protected]> + * + */ +@RunWith(KiWiDatabaseRunner.class) +public class KiWiCachingRepositoryTest extends RepositoryTest { + + private final KiWiConfiguration config; + + private KiWiStore store; + + public KiWiCachingRepositoryTest(KiWiConfiguration config) { + this.config = config; + } + + /* (non-Javadoc) + * @see org.openrdf.repository.RepositoryTest#createRepository() + */ + @Override + protected Repository createRepository() throws Exception { + store = new KiWiStore(config); + KiWiCachingSail cache = new KiWiCachingSail(store, new KiWiQueryCacheConfiguration()); + return new SailRepository(cache); + } + + @Override + public void tearDown() throws Exception { + store.getPersistence().dropDatabase(); + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java index cac63ff..dfe8d60 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java @@ -185,8 +185,8 @@ public class KiWiCacheManager { .eviction() .maxEntries(kiWiConfiguration.getTripleCacheSize()) .expiration() - .lifespan(60, TimeUnit.SECONDS) - .maxIdle(30, TimeUnit.SECONDS) + .lifespan(60, TimeUnit.MINUTES) + .maxIdle(30, TimeUnit.MINUTES) .build(); cacheManager.defineConfiguration(TRIPLE_CACHE, tripleConfiguration); } @@ -348,6 +348,30 @@ public class KiWiCacheManager { } + /** + * Return the Infinispan cache manager used by the caching infrastructure. + * + * @return + */ + public EmbeddedCacheManager getCacheManager() { + return cacheManager; + } + + /** + * Return the global cache manager configuration used by the caching infrastructure. + * @return + */ + public GlobalConfiguration getGlobalConfiguration() { + return globalConfiguration; + } + + /** + * Return the default cache configuration used by the caching infrastructure. + * @return + */ + public Configuration getDefaultConfiguration() { + return defaultConfiguration; + } /** * Clear all caches managed by this cache manager. http://git-wip-us.apache.org/repos/asf/marmotta/blob/7befade4/libraries/kiwi/pom.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/pom.xml b/libraries/kiwi/pom.xml index 50d58ce..d4c382a 100644 --- a/libraries/kiwi/pom.xml +++ b/libraries/kiwi/pom.xml @@ -107,6 +107,7 @@ <module>kiwi-reasoner</module> <module>kiwi-sparql</module> <module>kiwi-loader</module> + <module>kiwi-caching</module> </modules> </project>
