Author: rwesten
Date: Thu Dec 12 16:35:08 2013
New Revision: 1550448
URL: http://svn.apache.org/r1550448
Log:
STANBOL-1200: The RdfEntityDataIterator now uses an iterator that also works if
the SesameRepository does not provide triples sorted by Subjects. This new
iterator however requires to keep subjects in memory to filter out duplicates
Modified:
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
Modified:
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
URL:
http://svn.apache.org/viewvc/stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java?rev=1550448&r1=1550447&r2=1550448&view=diff
==============================================================================
---
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
(original)
+++
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
Thu Dec 12 16:35:08 2013
@@ -1,10 +1,14 @@
package org.apache.stanbol.entityhub.indexing.source.sesame;
+import info.aduna.iteration.CloseableIteration;
+import info.aduna.iteration.ConvertingIteration;
+import info.aduna.iteration.DistinctIteration;
+import info.aduna.iteration.FilterIteration;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.URI;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashSet;
@@ -32,6 +36,7 @@ import org.openrdf.model.Literal;
import org.openrdf.model.Model;
import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.repository.Repository;
@@ -392,71 +397,60 @@ public class RdfIndexingSource extends A
protected class RdfEntityDataIterator implements EntityDataIterator {
protected final RepositoryConnection connection;
- protected final RepositoryResult<Statement> stdItr;
+ protected final CloseableIteration<URI,RepositoryException> subjectItr;
protected final boolean followBNodes;
- private org.openrdf.model.URI currentEntity = null;
- /**
- * The last {@link Statement} read from {@link #stdItr}
- */
- private Statement currentStd = null;
/**
* The current Representation as created by {@link #next()}
*/
protected RdfRepresentation currentRep;
- /**
- * If the {@link #stdItr} is positioned on the 2nd {@link Statement}
- * of the next Entity and {@link #currentStd} holds the first one.
- */
- private boolean nextInitialised = false;
protected RdfEntityDataIterator(boolean followBNodes,
boolean includeInferred, Resource...contexts) throws
RepositoryException{
this.connection = repository.getConnection();
- stdItr = connection.getStatements(null, null, null,
includeInferred, contexts);
+ CloseableIteration<URI, RepositoryException> converter =
+ new ConvertingIteration<Statement, URI,
RepositoryException>(
+ connection.getStatements(null, null, null,
includeInferred, contexts)) {
+ @Override
+ protected URI convert(Statement sourceObject) throws
RepositoryException {
+ Resource r = sourceObject.getSubject();
+ return r instanceof URI ? (URI)r : null;
+ }
+ };
+ CloseableIteration<URI,RepositoryException> filter =
+ new FilterIteration<URI,RepositoryException>(converter){
+ @Override
+ protected boolean accept(URI object) throws
RepositoryException {
+ return object != null;
+ }
+ };
+ this.subjectItr = new DistinctIteration<URI,
RepositoryException>(filter);
this.followBNodes = followBNodes;
entityDataIterators.add(this);
}
@Override
public boolean hasNext() {
- if(nextInitialised){
- return true;
- }
try {
- while(stdItr.hasNext() && (currentStd == null ||
- !(currentStd.getSubject() instanceof
org.openrdf.model.URI))){
- currentStd = stdItr.next();
- }
- if(stdItr.hasNext()){
- nextInitialised = true;
- }
- return nextInitialised;
+ return subjectItr.hasNext();
} catch (RepositoryException e) {
- throw new IllegalArgumentException("Exceptions while reading "
- + "Statements after " + currentStd ,e);
+ throw new IllegalStateException("Exceptions while checking "
+ + "for next subject" ,e);
}
}
@Override
public String next() {
- if(nextInitialised || hasNext()){
- final org.openrdf.model.URI subject =
- (org.openrdf.model.URI)currentStd.getSubject();
+ URI subject = null;
+ try {
+ subject = subjectItr.next();
currentRep = vf.createRdfRepresentation(subject);
- try {
createRepresentation(subject, currentRep.getModel());
- } catch (RepositoryException e) {
- currentRep = null;
- throw new IllegalStateException("Unable to read statements
"
- + "for Entity " + (currentStd != null ?
currentStd.getSubject() :
- "") +"!",e);
- }
- nextInitialised = false;
return subject.toString();
- } else {
+ } catch (RepositoryException e) {
currentRep = null;
- throw new NoSuchElementException();
+ throw new IllegalStateException("Unable to read statements "
+ + "for Entity " + (subject == null ? "unknown" : subject)
+"!",e);
}
}
@@ -471,6 +465,8 @@ public class RdfIndexingSource extends A
*/
protected void createRepresentation(org.openrdf.model.URI subject,
final Model model)
throws RepositoryException {
+ RepositoryResult<Statement> stmts = connection.getStatements(
+ subject,null,null,includeInferred,contexts);
final Set<BNode> bnodes;
final Set<BNode> visited;
if(followBNodeState){
@@ -480,11 +476,9 @@ public class RdfIndexingSource extends A
bnodes = null;
visited = null;
}
- boolean next = false;
- while(!next && stdItr.hasNext()){
- currentStd = stdItr.next();
- next = !subject.equals(currentStd.getSubject());
- if(!next){
+ try {
+ while(stmts.hasNext()){
+ Statement currentStd = stmts.next();
model.add(currentStd);
if(followBNodeState){ //keep referenced BNodes
Value object = currentStd.getObject();
@@ -492,7 +486,9 @@ public class RdfIndexingSource extends A
bnodes.add((BNode)object);
}
} //else do not follow BNode values
- } //else the subject has changed ... stop here
+ }
+ } finally {
+ stmts.close();
}
if(followBNodeState){ //process BNodes
for(BNode bnode : bnodes){
@@ -520,6 +516,9 @@ public class RdfIndexingSource extends A
public void close() {
entityDataIterators.remove(this);
try {
+ subjectItr.close();
+ } catch (RepositoryException e) {/* ignore */}
+ try {
connection.close();
} catch (RepositoryException e) { /* ignore */ }
}
@@ -587,7 +586,7 @@ public class RdfIndexingSource extends A
}
@Override
- public Literal createLiteral(String content, Locale language, URI type) {
+ public Literal createLiteral(String content, Locale language, java.net.URI
type) {
return createLiteralInternal(sesameFactory, content, language, type);
}