Repository: marmotta Updated Branches: refs/heads/develop 93cd20a25 -> 8af7e91ce
using a bloom filter instead of a set for tracking deleted triples Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/290ec309 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/290ec309 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/290ec309 Branch: refs/heads/develop Commit: 290ec30986247b95cf9ea9b05a95a0db6ce492ad Parents: b25a268 Author: Sebastian Schaffert <[email protected]> Authored: Mon Mar 17 14:50:36 2014 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Mon Mar 17 14:50:36 2014 +0100 ---------------------------------------------------------------------- .../marmotta/kiwi/hashing/NodeFunnel.java | 58 ++++++++++ .../kiwi/hashing/PrimitiveSinkOutput.java | 108 +++++++++++++++++++ .../marmotta/kiwi/hashing/TripleFunnel.java | 60 +++++++++++ .../kiwi/persistence/KiWiConnection.java | 16 +-- 4 files changed, 235 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/290ec309/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/NodeFunnel.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/NodeFunnel.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/NodeFunnel.java new file mode 100644 index 0000000..efda1e8 --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/NodeFunnel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.hashing; + +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; +import org.apache.marmotta.kiwi.io.KiWiIO; +import org.apache.marmotta.kiwi.model.rdf.KiWiNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class NodeFunnel implements Funnel<KiWiNode> { + private static Logger log = LoggerFactory.getLogger(NodeFunnel.class); + + private static NodeFunnel instance; + + private NodeFunnel() { + } + + + public synchronized static NodeFunnel getInstance() { + if(instance == null) { + instance = new NodeFunnel(); + } + return instance; + } + + @Override + public void funnel(KiWiNode kiWiNode, PrimitiveSink primitiveSink) { + try { + KiWiIO.writeNode(new PrimitiveSinkOutput(primitiveSink), kiWiNode); + } catch (IOException e) { + log.error("I/O error while writing data to sink (cannot happen)"); + } + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/290ec309/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/PrimitiveSinkOutput.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/PrimitiveSinkOutput.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/PrimitiveSinkOutput.java new file mode 100644 index 0000000..f437c04 --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/PrimitiveSinkOutput.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.hashing; + +import com.google.common.hash.PrimitiveSink; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Wrapper implementing the java.io.DataOutput so that operations write into a Guava primitive sink for hasing. + * + * @author Sebastian Schaffert ([email protected]) + */ +public class PrimitiveSinkOutput implements DataOutput { + + PrimitiveSink sink; + + public PrimitiveSinkOutput(PrimitiveSink sink) { + this.sink = sink; + } + + @Override + public void write(int i) throws IOException { + sink.putInt(i); + } + + @Override + public void write(byte[] bytes) throws IOException { + sink.putBytes(bytes); + } + + @Override + public void write(byte[] bytes, int i, int i2) throws IOException { + sink.putBytes(bytes,i,i2); + } + + @Override + public void writeBoolean(boolean b) throws IOException { + sink.putBoolean(b); + } + + @Override + public void writeByte(int i) throws IOException { + sink.putByte((byte)i); + } + + @Override + public void writeShort(int i) throws IOException { + sink.putShort((short)i); + } + + @Override + public void writeChar(int i) throws IOException { + sink.putChar((char)i); + } + + @Override + public void writeInt(int i) throws IOException { + sink.putInt(i); + } + + @Override + public void writeLong(long l) throws IOException { + sink.putLong(l); + } + + @Override + public void writeFloat(float v) throws IOException { + sink.putFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + sink.putDouble(v); + } + + @Override + public void writeBytes(String s) throws IOException { + sink.putString(s, Charset.defaultCharset()); + } + + @Override + public void writeChars(String s) throws IOException { + sink.putString(s, Charset.defaultCharset()); + } + + @Override + public void writeUTF(String s) throws IOException { + sink.putString(s, Charset.defaultCharset()); + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/290ec309/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/TripleFunnel.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/TripleFunnel.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/TripleFunnel.java new file mode 100644 index 0000000..1ebb368 --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/hashing/TripleFunnel.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.hashing; + +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; +import org.apache.marmotta.kiwi.io.KiWiIO; +import org.apache.marmotta.kiwi.model.rdf.KiWiTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A Guava Funnel implementation based on the serialization of the triple. + * + * @author Sebastian Schaffert ([email protected]) + */ +public class TripleFunnel implements Funnel<KiWiTriple> { + private static Logger log = LoggerFactory.getLogger(TripleFunnel.class); + + private static TripleFunnel instance; + + private TripleFunnel() { + } + + + public synchronized static TripleFunnel getInstance() { + if(instance == null) { + instance = new TripleFunnel(); + } + return instance; + } + + + @Override + public void funnel(KiWiTriple kiWiTriple, PrimitiveSink primitiveSink) { + try { + KiWiIO.writeTriple(new PrimitiveSinkOutput(primitiveSink), kiWiTriple); + } catch (IOException e) { + log.error("I/O error while writing data to sink (cannot happen)"); + } + + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/290ec309/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java index 28edf7f..31d3d95 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java @@ -18,6 +18,8 @@ package org.apache.marmotta.kiwi.persistence; import com.google.common.base.Preconditions; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; import com.google.common.primitives.Longs; import info.aduna.iteration.*; import org.apache.commons.lang3.math.NumberUtils; @@ -130,7 +132,7 @@ public class KiWiConnection implements AutoCloseable { // this set keeps track of all statements that have been deleted in the active transaction of this connection // this is needed to be able to determine if adding the triple again will merely undo a deletion or is a // completely new addition to the triple store - private HashSet<Long> deletedStatementsLog; + private BloomFilter<Long> deletedStatementsLog; private static long numberOfCommits = 0; @@ -147,7 +149,7 @@ public class KiWiConnection implements AutoCloseable { this.uriLock = new ReentrantLock(); this.bnodeLock = new ReentrantLock(); this.batchCommit = dialect.isBatchSupported(); - this.deletedStatementsLog = new HashSet<Long>(); + this.deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000); this.transactionId = getNextSequence("seq.tx"); initCachePool(); @@ -1134,7 +1136,7 @@ public class KiWiConnection implements AutoCloseable { triple.setId(getNextSequence("seq.triples")); } - if(deletedStatementsLog.contains(triple.getId())) { + if(deletedStatementsLog.mightContain(triple.getId())) { // this is a hack for a concurrency problem that may occur in case the triple is removed in the // transaction and then added again; in these cases the createStatement method might return // an expired state of the triple because it uses its own database connection @@ -1281,7 +1283,7 @@ public class KiWiConnection implements AutoCloseable { deleteTriple.setLong(1, triple.getId()); deleteTriple.executeUpdate(); } - deletedStatementsLog.add(triple.getId()); + deletedStatementsLog.put(triple.getId()); } } finally { commitLock.unlock(); @@ -1294,7 +1296,7 @@ public class KiWiConnection implements AutoCloseable { deleteTriple.setLong(1, triple.getId()); deleteTriple.executeUpdate(); } - deletedStatementsLog.add(triple.getId()); + deletedStatementsLog.put(triple.getId()); } @@ -2282,7 +2284,7 @@ public class KiWiConnection implements AutoCloseable { } - deletedStatementsLog.clear(); + deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000); if(connection != null) { connection.commit(); @@ -2316,7 +2318,7 @@ public class KiWiConnection implements AutoCloseable { tripleBatch.clear(); } } - deletedStatementsLog.clear(); + deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000); if(connection != null && !connection.isClosed()) { connection.rollback(); }
