RYA-13 Add delete support to secondary indices
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e5e227c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e5e227c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e5e227c1 Branch: refs/heads/develop Commit: e5e227c159fdcdb3ccc05af0049b35f78aa4831e Parents: 80faf06 Author: ejwhite922 <[email protected]> Authored: Fri Dec 4 16:35:23 2015 -0500 Committer: ejwhite922 <[email protected]> Committed: Fri Dec 4 16:35:23 2015 -0500 ---------------------------------------------------------------------- .../java/mvm/rya/accumulo/AccumuloRyaDAO.java | 81 ++-- .../accumulo/entity/EntityCentricIndex.java | 64 ++- .../freetext/AccumuloFreeTextIndexer.java | 238 +++++++--- .../accumulo/geo/GeoMesaGeoIndexer.java | 90 ++-- .../temporal/AccumuloTemporalIndexer.java | 216 ++++++--- .../freetext/AccumuloFreeTextIndexerTest.java | 239 ++++++---- .../indexing/accumulo/geo/GeoIndexerTest.java | 451 ++++++++++--------- .../temporal/AccumuloTemporalIndexerTest.java | 249 +++++----- .../src/main/java/EntityDirectExample.java | 121 +++-- .../src/main/java/RyaDirectExample.java | 277 ++++++++---- 10 files changed, 1212 insertions(+), 814 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java index 764ca80..84fae68 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java @@ -31,7 +31,6 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; -import info.aduna.iteration.CloseableIteration; import java.util.Collection; import java.util.Collections; @@ -40,21 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.RyaNamespaceManager; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchDeleter; @@ -80,6 +64,21 @@ import org.openrdf.model.Namespace; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + /** * Class AccumuloRyaDAO * Date: Feb 29, 2012 @@ -102,7 +101,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName private BatchWriter bw_ns; private List<AccumuloIndexer> secondaryIndexers; - + private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); private RyaTableMutationsFactory ryaTableMutationsFactory; private TableLayoutStrategy tableLayoutStrategy; @@ -132,15 +131,15 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName tableLayoutStrategy = conf.getTableLayoutStrategy(); ryaContext = RyaTripleContext.getInstance(conf); ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext); - + secondaryIndexers = conf.getAdditionalIndexers(); - + TableOperations tableOperations = connector.tableOperations(); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); - + for (AccumuloIndexer index : secondaryIndexers) { index.setConf(conf); } @@ -154,7 +153,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, MAX_TIME, 1); - + for (AccumuloIndexer index : secondaryIndexers) { index.setMultiTableBatchWriter(mt_bw); } @@ -169,7 +168,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } } - public String getVersion() throws RyaDAOException { + @Override + public String getVersion() throws RyaDAOException { String version = null; CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); if (versIter.hasNext()) { @@ -206,6 +206,10 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName while (query.hasNext()) { deleteSingleRyaStatement(query.next()); } + + for (AccumuloIndexer index : secondaryIndexers) { + index.deleteStatement(stmt); + } } mt_bw.flush(); //TODO currently all indexers do not support delete @@ -213,7 +217,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName throw new RyaDAOException(e); } } - + @Override public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { BatchDeleter bd_spo = null; @@ -234,16 +238,16 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bd_po.fetchColumnFamily(new Text(graph.getData())); bd_osp.fetchColumnFamily(new Text(graph.getData())); } - + bd_spo.delete(); bd_po.delete(); bd_osp.delete(); - + //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown // for (AccumuloIndex index : secondaryIndexers) { // index.dropGraph(graphs); // } - + } catch (Exception e) { throw new RyaDAOException(e); } finally { @@ -251,7 +255,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName if (bd_po != null) bd_po.close(); if (bd_osp != null) bd_osp.close(); } - + } protected void deleteSingleRyaStatement(RyaStatement stmt) throws TripleRowResolverException, MutationsRejectedException { @@ -281,7 +285,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName //TODO: Should have a lock here in case we are adding and committing at the same time while (commitStatements.hasNext()) { RyaStatement stmt = commitStatements.next(); - + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); @@ -289,7 +293,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bw_spo.addMutations(spo); bw_po.addMutations(po); bw_osp.addMutations(osp); - + for (AccumuloIndexer index : secondaryIndexers) { index.storeStatement(stmt); } @@ -433,11 +437,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return mt_bw; } - public AccumuloRdfConfiguration getConf() { + @Override + public AccumuloRdfConfiguration getConf() { return conf; } - public void setConf(AccumuloRdfConfiguration conf) { + @Override + public void setConf(AccumuloRdfConfiguration conf) { this.conf = conf; } @@ -449,7 +455,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName this.ryaTableMutationsFactory = ryaTableMutationsFactory; } - public AccumuloRyaQueryEngine getQueryEngine() { + @Override + public AccumuloRyaQueryEngine getQueryEngine() { return queryEngine; } @@ -460,13 +467,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName protected String[] getTables() { // core tables List<String> tableNames = Lists.newArrayList( - tableLayoutStrategy.getSpo(), - tableLayoutStrategy.getPo(), - tableLayoutStrategy.getOsp(), + tableLayoutStrategy.getSpo(), + tableLayoutStrategy.getPo(), + tableLayoutStrategy.getOsp(), tableLayoutStrategy.getNs(), tableLayoutStrategy.getEval()); - - // Additional Tables + + // Additional Tables for (AccumuloIndexer index : secondaryIndexers) { tableNames.add(index.getTableName()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java index b8b3f65..1e2b18a 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -30,19 +30,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -56,23 +43,21 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; -import org.openrdf.model.Statement; -import org.openrdf.query.algebra.evaluation.QueryOptimizer; -import org.openrdf.query.algebra.evaluation.impl.BindingAssigner; -import org.openrdf.query.algebra.evaluation.impl.CompareOptimizer; -import org.openrdf.query.algebra.evaluation.impl.ConjunctiveConstraintSplitter; -import org.openrdf.query.algebra.evaluation.impl.ConstantOptimizer; -import org.openrdf.query.algebra.evaluation.impl.DisjunctiveConstraintOptimizer; -import org.openrdf.query.algebra.evaluation.impl.FilterOptimizer; -import org.openrdf.query.algebra.evaluation.impl.IterativeEvaluationOptimizer; -import org.openrdf.query.algebra.evaluation.impl.OrderLimitOptimizer; -import org.openrdf.query.algebra.evaluation.impl.QueryModelNormalizer; -import org.openrdf.query.algebra.evaluation.impl.SameTermFilterOptimizer; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.primitives.Bytes; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.indexing.accumulo.ConfigUtils; + public class EntityCentricIndex extends AbstractAccumuloIndexer { private static final Logger logger = Logger.getLogger(EntityCentricIndex.class); @@ -81,23 +66,23 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { private AccumuloRdfConfiguration conf; private BatchWriter writer; private boolean isInit = false; - + public static final String CONF_TABLE_SUFFIX = "ac.indexer.eci.tablename"; - + private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, TableExistsException { ConfigUtils.createTableIfNotExists(conf, ConfigUtils.getEntityTableName(conf)); } - - - @Override + + + @Override public Configuration getConf() { return this.conf; } - + //initialization occurs in setConf because index is created using reflection - @Override + @Override public void setConf(Configuration conf) { if (conf instanceof AccumuloRdfConfiguration) { this.conf = (AccumuloRdfConfiguration) conf; @@ -126,7 +111,7 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { } } } - + @Override public String getTableName() { @@ -147,7 +132,8 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { } - + + @Override public void storeStatement(RyaStatement stmt) throws IOException { Preconditions.checkNotNull(writer, "BatchWriter not Set"); try { @@ -161,7 +147,8 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { } } - + + @Override public void deleteStatement(RyaStatement stmt) throws IOException { Preconditions.checkNotNull(writer, "BatchWriter not Set"); try { @@ -185,10 +172,13 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { byte[] columnQualifier = tripleRow.getColumnQualifier(); Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp()); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + + m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp()); return m; } - + public static Collection<Mutation> createMutations(RyaStatement stmt) throws RyaTypeResolverException{ Collection<Mutation> m = Lists.newArrayList(); for (TripleRow tr : serializeStatement(stmt)){ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java index f529569..fdefbea 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java @@ -22,7 +22,6 @@ package mvm.rya.indexing.accumulo.freetext; import static mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils.getNodeIterator; -import info.aduna.iteration.CloseableIteration; import java.io.IOException; import java.nio.charset.CharacterCodingException; @@ -35,25 +34,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.Md5Hash; -import mvm.rya.indexing.accumulo.StatementSerializer; -import mvm.rya.indexing.accumulo.freetext.iterators.BooleanTreeIterator; -import mvm.rya.indexing.accumulo.freetext.query.ASTExpression; -import mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils; -import mvm.rya.indexing.accumulo.freetext.query.ASTSimpleNode; -import mvm.rya.indexing.accumulo.freetext.query.ASTTerm; -import mvm.rya.indexing.accumulo.freetext.query.ParseException; -import mvm.rya.indexing.accumulo.freetext.query.QueryParser; -import mvm.rya.indexing.accumulo.freetext.query.QueryParserTreeConstants; -import mvm.rya.indexing.accumulo.freetext.query.SimpleNode; -import mvm.rya.indexing.accumulo.freetext.query.TokenMgrError; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -82,6 +62,26 @@ import org.openrdf.query.QueryEvaluationException; import com.google.common.base.Charsets; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.Md5Hash; +import mvm.rya.indexing.accumulo.StatementSerializer; +import mvm.rya.indexing.accumulo.freetext.iterators.BooleanTreeIterator; +import mvm.rya.indexing.accumulo.freetext.query.ASTExpression; +import mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils; +import mvm.rya.indexing.accumulo.freetext.query.ASTSimpleNode; +import mvm.rya.indexing.accumulo.freetext.query.ASTTerm; +import mvm.rya.indexing.accumulo.freetext.query.ParseException; +import mvm.rya.indexing.accumulo.freetext.query.QueryParser; +import mvm.rya.indexing.accumulo.freetext.query.QueryParserTreeConstants; +import mvm.rya.indexing.accumulo.freetext.query.SimpleNode; +import mvm.rya.indexing.accumulo.freetext.query.TokenMgrError; + /** * The {@link AccumuloFreeTextIndexer} stores and queries "free text" data from statements into tables in Accumulo. Specifically, this class * stores data into two different Accumulo Tables. This is the <b>document table</b> (default name: triplestore_text) and the <b>terms @@ -92,27 +92,27 @@ import com.google.common.base.Charsets; * <p> * For each document, the document table will store the following information: * <P> - * + * * <pre> - * Row (partition) | Column Family | Column Qualifier | Value + * Row (partition) | Column Family | Column Qualifier | Value * ================+================+==================+========== - * shardID | d\x00 | documentHash | Document - * shardID | s\x00Subject | documentHash | (empty) - * shardID | p\x00Predicate | documentHash | (empty) - * shardID | o\x00Object | documentHash | (empty) - * shardID | c\x00Context | documentHash | (empty) + * shardID | d\x00 | documentHash | Document + * shardID | s\x00Subject | documentHash | (empty) + * shardID | p\x00Predicate | documentHash | (empty) + * shardID | o\x00Object | documentHash | (empty) + * shardID | c\x00Context | documentHash | (empty) * shardID | t\x00token | documentHash | (empty) * </pre> * <p> * Note: documentHash is a sha256 Hash of the Document's Content * <p> - * The terms table is used for expanding wildcard search terms. For each token in the document table, the table sill store the following + * The terms table is used for expanding wildcard search terms. For each token in the document table, the table will store the following * information: - * + * * <pre> - * Row (partition) | CF/CQ/Value + * Row (partition) | CF/CQ/Value * ==================+============= - * l\x00token | (empty) + * l\x00token | (empty) * r\x00Reversetoken | (empty) * </pre> * <p> @@ -121,7 +121,7 @@ import com.google.common.base.Charsets; * into car, bar, and far. * <p> * Example: Given these three statements as inputs: - * + * * <pre> * <uri:paul> rdfs:label "paul smith"@en <uri:graph1> * <uri:steve> rdfs:label "steven anthony miller"@en <uri:graph1> @@ -131,9 +131,9 @@ import com.google.common.base.Charsets; * Here's what the tables would look like: (Note: the hashes aren't real, the rows are not sorted, and the partition ids will vary.) * <p> * Triplestore_text - * + * * <pre> - * Row (partition) | Column Family | Column Qualifier | Value + * Row (partition) | Column Family | Column Qualifier | Value * ================+=================================+==================+========== * 000000 | d\x00 | 08b3d233a | uri:graph1x00uri:paul\x00rdfs:label\x00"paul smith"@en * 000000 | s\x00uri:paul | 08b3d233a | (empty) @@ -142,7 +142,7 @@ import com.google.common.base.Charsets; * 000000 | c\x00uri:graph1 | 08b3d233a | (empty) * 000000 | t\x00paul | 08b3d233a | (empty) * 000000 | t\x00smith | 08b3d233a | (empty) - * + * * 000000 | d\x00 | 3a575534b | uri:graph1x00uri:steve\x00rdfs:label\x00"steven anthony miller"@en * 000000 | s\x00uri:steve | 3a575534b | (empty) * 000000 | p\x00rdfs:label | 3a575534b | (empty) @@ -151,7 +151,7 @@ import com.google.common.base.Charsets; * 000000 | t\x00steven | 3a575534b | (empty) * 000000 | t\x00anthony | 3a575534b | (empty) * 000000 | t\x00miller | 3a575534b | (empty) - * + * * 000001 | d\x00 | 7bf670d06 | uri:graph1x00uri:steve\x00rdfs:label\x00"steve miller"@en * 000001 | s\x00uri:steve | 7bf670d06 | (empty) * 000001 | p\x00rdfs:label | 7bf670d06 | (empty) @@ -163,9 +163,9 @@ import com.google.common.base.Charsets; * <p> * triplestore_terms * <p> - * + * * <pre> - * Row (partition) | CF/CQ/Value + * Row (partition) | CF/CQ/Value * ==================+============= * l\x00paul | (empty) * l\x00smith | (empty) @@ -179,12 +179,14 @@ import com.google.common.base.Charsets; * r\x00ynohtna | (empty) * r\x00rellim | (empty) * r\x00evets | (empty) - * + * * <pre> */ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements FreeTextIndexer { private static final Logger logger = Logger.getLogger(AccumuloFreeTextIndexer.class); + private static final boolean IS_TERM_TABLE_TOKEN_DELETION_ENABLED = true; + private static final byte[] EMPTY_BYTES = new byte[] {}; private static final Text EMPTY_TEXT = new Text(EMPTY_BYTES); private static final Value EMPTY_VALUE = new Value(EMPTY_BYTES); @@ -202,10 +204,10 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private Set<URI> validPredicates; private Configuration conf; - + private boolean isInit = false; - + private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { String doctable = ConfigUtils.getFreeTextDocTablename(conf); @@ -262,8 +264,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf); } - - + + //initialization occurs in setConf because index is created using reflection @Override public void setConf(Configuration conf) { @@ -272,27 +274,18 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements try { init(); isInit = true; - } catch (AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableExistsException e) { + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } } } - + @Override public Configuration getConf() { return this.conf; } - + private void storeStatement(Statement statement) throws IOException { // if the predicate list is empty, accept all predicates. @@ -363,6 +356,12 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements return m; } + private static Mutation createEmptyPutDeleteMutation(Text row) { + Mutation m = new Mutation(row); + m.putDelete(EMPTY_TEXT, EMPTY_TEXT); + return m; + } + private static Text genPartition(int partition, int numParitions) { int length = Integer.toString(numParitions).length(); return new Text(String.format("%0" + length + "d", Math.abs(partition % numParitions))); @@ -471,13 +470,7 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private Scanner getScanner(String tablename) throws IOException { try { return ConfigUtils.createScanner(tablename, conf); - } catch (AccumuloException e) { - logger.error("Error connecting to " + tablename); - throw new IOException(e); - } catch (AccumuloSecurityException e) { - logger.error("Error connecting to " + tablename); - throw new IOException(e); - } catch (TableNotFoundException e) { + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { logger.error("Error connecting to " + tablename); throw new IOException(e); } @@ -574,7 +567,9 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements @Override public void close() throws QueryEvaluationException { - s.close(); + if (s != null) { + s.close(); + } } }; } @@ -582,7 +577,7 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements /** * Simple adapter that parses the query using {@link QueryParser}. Note: any checked exceptions thrown by {@link QueryParser} are * re-thrown as {@link IOException}s. - * + * * @param query * @return * @throws IOException @@ -600,12 +595,121 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements } return root; } - - + + @Override public String getTableName() { return ConfigUtils.getFreeTextDocTablename(conf); } - + private void deleteStatement(Statement statement) throws IOException { + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + + // Get the tokens + String text = statement.getObject().stringValue().toLowerCase(); + SortedSet<String> tokens = tokenizer.tokenize(text); + + if (!tokens.isEmpty()) { + // Get Document Data + String docContent = StatementSerializer.writeStatement(statement); + + String docId = Md5Hash.md5Base64(docContent); + + // Setup partition + Text partition = genPartition(docContent.hashCode(), docTableNumPartitions); + + Mutation docTableMut = new Mutation(partition); + List<Mutation> termTableMutations = new ArrayList<Mutation>(); + + Text docIdText = new Text(docId); + + // Delete the Document Data + docTableMut.putDelete(ColumnPrefixes.DOCS_CF_PREFIX, docIdText); + + // Delete the statement parts in index + docTableMut.putDelete(ColumnPrefixes.getSubjColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getPredColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getObjColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getContextColFam(statement), docIdText); + + + // Delete the statement terms in index + for (String token : tokens) { + if (IS_TERM_TABLE_TOKEN_DELETION_ENABLED) { + int rowId = Integer.parseInt(partition.toString()); + boolean doesTermExistInOtherDocs = doesTermExistInOtherDocs(token, rowId, docIdText); + // Only delete the term from the term table if it doesn't appear in other docs + if (!doesTermExistInOtherDocs) { + // Delete the term in the term table + termTableMutations.add(createEmptyPutDeleteMutation(ColumnPrefixes.getTermListColFam(token))); + termTableMutations.add(createEmptyPutDeleteMutation(ColumnPrefixes.getRevTermListColFam(token))); + } + } + + // Un-tie the token to the document + docTableMut.putDelete(ColumnPrefixes.getTermColFam(token), docIdText); + } + + // write the mutations + try { + docTableBw.addMutation(docTableMut); + termTableBw.addMutations(termTableMutations); + } catch (MutationsRejectedException e) { + logger.error("error adding mutation", e); + throw new IOException(e); + } + + } + } + } + + @Override + public void deleteStatement(RyaStatement statement) throws IOException { + deleteStatement(RyaToRdfConversions.convertStatement(statement)); + } + + /** + * Checks to see if the provided term appears in other documents. + * @param term the term to search for. + * @param currentDocId the current document ID that the search term exists in. + * @return {@code true} if the term was found in other documents. {@code false} otherwise. + */ + private boolean doesTermExistInOtherDocs(String term, int currentDocId, Text docIdText) { + try { + String freeTextDocTableName = ConfigUtils.getFreeTextDocTablename(conf); + Scanner scanner = getScanner(freeTextDocTableName); + + String t = StringUtils.removeEnd(term, "*").toLowerCase(); + Text queryTerm = ColumnPrefixes.getTermColFam(t); + + // perform query and read results + scanner.fetchColumnFamily(queryTerm); + + for (Entry<Key, Value> entry : scanner) { + Key key = entry.getKey(); + Text row = key.getRow(); + int rowId = Integer.parseInt(row.toString()); + // We only want to check other documents from the one we're deleting + if (rowId != currentDocId) { + Text columnFamily = key.getColumnFamily(); + String columnFamilyValue = columnFamily.toString(); + // Check that the value has the term prefix + if (columnFamilyValue.startsWith(ColumnPrefixes.TERM_CF_PREFIX.toString())) { + Text text = ColumnPrefixes.removePrefix(columnFamily); + String value = text.toString(); + if (value.equals(term)) { + return true; + } + } + } + } + } catch (IOException e) { + logger.error("Error searching for the existance of the term in other documents", e); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java index 37acf89..c8b5b4a 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java @@ -1,4 +1,4 @@ -package mvm.rya.indexing.accumulo.geo; +package mvm.rya.indexing.accumulo.geo; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,8 +21,6 @@ package mvm.rya.indexing.accumulo.geo; -import info.aduna.iteration.CloseableIteration; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -30,37 +28,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.Md5Hash; -import mvm.rya.indexing.accumulo.StatementSerializer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.geotools.data.DataStore; import org.geotools.data.DataStoreFinder; +import org.geotools.data.DataUtilities; import org.geotools.data.FeatureSource; import org.geotools.data.FeatureStore; import org.geotools.data.Query; +import org.geotools.factory.CommonFactoryFinder; import org.geotools.factory.Hints; import org.geotools.feature.DefaultFeatureCollection; import org.geotools.feature.FeatureIterator; @@ -68,21 +52,33 @@ import org.geotools.feature.SchemaException; import org.geotools.feature.simple.SimpleFeatureBuilder; import org.geotools.filter.text.cql2.CQLException; import org.geotools.filter.text.ecql.ECQL; +import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; import org.locationtech.geomesa.accumulo.index.Constants; import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes; import org.opengis.feature.simple.SimpleFeature; import org.opengis.feature.simple.SimpleFeatureType; import org.opengis.filter.Filter; +import org.opengis.filter.FilterFactory; +import org.opengis.filter.identity.Identifier; import org.openrdf.model.Literal; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.query.QueryEvaluationException; -import com.google.common.base.Preconditions; import com.vividsolutions.jts.geom.Geometry; import com.vividsolutions.jts.io.ParseException; import com.vividsolutions.jts.io.WKTReader; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.Md5Hash; +import mvm.rya.indexing.accumulo.StatementSerializer; + /** * A {@link GeoIndexer} wrapper around a GeoMesa {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the * RDF Feature Type, and interacts with the Datastore. @@ -129,7 +125,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd private static final Logger logger = Logger.getLogger(GeoMesaGeoIndexer.class); private static final String FEATURE_NAME = "RDF"; - + private static final String SUBJECT_ATTRIBUTE = "S"; private static final String PREDICATE_ATTRIBUTE = "P"; private static final String OBJECT_ATTRIBUTE = "O"; @@ -141,7 +137,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; private SimpleFeatureType featureType; private boolean isInit = false; - + //initialization occurs in setConf because index is created using reflection @Override public void setConf(Configuration conf) { @@ -156,18 +152,18 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd } } } - + @Override public Configuration getConf() { return this.conf; } - + private void init() throws IOException { validPredicates = ConfigUtils.getGeoPredicates(conf); DataStore dataStore = createDataStore(conf); - + try { featureType = getStatementFeatureType(dataStore); } catch (IOException e) { @@ -235,7 +231,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd // create a feature collection DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - + for (RyaStatement ryaStatement : ryaStatements) { Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); @@ -264,7 +260,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd public void storeStatement(RyaStatement statement) throws IOException { storeStatements(Collections.singleton(statement)); } - + private static SimpleFeature createFeature(SimpleFeatureType featureType, Statement statement) throws ParseException { String subject = StatementSerializer.writeSubject(statement); String predicate = StatementSerializer.writePredicate(statement); @@ -358,7 +354,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd @Override public Statement next() throws QueryEvaluationException { - SimpleFeature feature = (SimpleFeature) getIterator().next(); + SimpleFeature feature = getIterator().next(); String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); @@ -440,8 +436,42 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd return ConfigUtils.getGeoTablename(conf); } + private void deleteStatements(Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + + for (RyaStatement ryaStatement : ryaStatements) { + Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + // remove this feature collection from the store + if (!featureCollection.isEmpty()) { + Set<Identifier> featureIds = new HashSet<Identifier>(); + FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); + Set<String> stringIds = DataUtilities.fidSet(featureCollection); + for (String id : stringIds) { + featureIds.add(filterFactory.featureId(id)); + } + Filter filter = filterFactory.id(featureIds); + featureStore.removeFeatures(filter); + } + } - + @Override + public void deleteStatement(RyaStatement statement) throws IOException { + deleteStatements(Collections.singleton(statement)); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index e2f98b3..095f18f 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -20,8 +20,6 @@ package mvm.rya.indexing.accumulo.temporal; */ -import info.aduna.iteration.CloseableIteration; - import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.util.Collection; @@ -36,21 +34,6 @@ import java.util.regex.Pattern; import javax.xml.datatype.XMLGregorianCalendar; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.KeyParts; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.StatementSerializer; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -77,13 +60,24 @@ import org.openrdf.model.URI; import org.openrdf.query.QueryEvaluationException; import cern.colt.Arrays; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.KeyParts; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.StatementSerializer; public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class); private static final String CF_INTERVAL = "interval"; - + // Delimiter used in the interval stored in the triple's object literal. @@ -99,11 +93,11 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements private Set<URI> validPredicates; private String temporalIndexTableName; - + private boolean isInit = false; - - + + private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { temporalIndexTableName = ConfigUtils.getTemporalTableName(conf); @@ -116,7 +110,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements validPredicates = ConfigUtils.getTemporalPredicates(conf); } - + //initialization occurs in setConf because index is created using reflection @Override public void setConf(Configuration conf) { @@ -140,13 +134,13 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } } } - + @Override public Configuration getConf() { return this.conf; } - - + + /** * Store a statement in the index if it meets the criterion: Object should be * a literal and one of the validPredicates from the configuration. @@ -180,18 +174,18 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements throw new IOException("While adding interval/instant for statement =" + statement, e); } } - - + + @Override public void storeStatement(RyaStatement statement) throws IllegalArgumentException, IOException { storeStatement(RyaToRdfConversions.convertStatement(statement)); } - - + + /** * parse the literal dates from the object of a statement. - * + * * @param statement * @param outputDateTimes */ @@ -209,7 +203,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements outputDateTimes[1] = new DateTime(matcher.group(2)); return; } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage() + " " + logThis; + logThis = e.getMessage() + " " + logThis; outputDateTimes[0]=null; outputDateTimes[1]=null; } @@ -221,7 +215,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements outputDateTimes[1] = null; return; } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage(); + logThis = e.getMessage(); } // Try again using Joda Time DateTime.parse() try { @@ -230,13 +224,58 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue()); return; } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage() + " " + logThis; + logThis = e.getMessage() + " " + logThis; } - logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis); + logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis); return; } /** + * Remove an interval index + * TODO: integrate into KeyParts (or eliminate) + * @param writer + * @param cv + * @param interval + * @throws MutationsRejectedException + */ + public void removeInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException { + Text cf = new Text(StatementSerializer.writeContext(statement)); + Text cqBegin = new Text(KeyParts.CQ_BEGIN); + Text cqEnd = new Text(KeyParts.CQ_END); + + // Start Begin index + Text keyText = new Text(interval.getAsKeyBeginning()); + KeyParts.appendUniqueness(statement, keyText); + Mutation m = new Mutation(keyText); + m.putDelete(cf, cqBegin); + writer.addMutation(m); + + // now the end index: + keyText = new Text(interval.getAsKeyEnd()); + KeyParts.appendUniqueness(statement, keyText); + m = new Mutation(keyText); + m.putDelete(cf, cqEnd); + writer.addMutation(m); + } + + /** + * Remove an interval instant + * + * @param writer + * @param cv + * @param instant + * @throws MutationsRejectedException + */ + public void removeInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { + KeyParts keyParts = new KeyParts(statement, instant); + for (KeyParts k: keyParts) { + Mutation m = new Mutation(k.getStoreKey()); + m.putDelete(k.cf, k.cq); + writer.addMutation(m); + } + } + + /** * Index a new interval * TODO: integrate into KeyParts (or eliminate) * @param writer @@ -250,9 +289,9 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements Text cf = new Text(StatementSerializer.writeContext(statement)); Text cqBegin = new Text(KeyParts.CQ_BEGIN); Text cqEnd = new Text(KeyParts.CQ_END); - + // Start Begin index - Text keyText =new Text(interval.getAsKeyBeginning()); + Text keyText = new Text(interval.getAsKeyBeginning()); KeyParts.appendUniqueness(statement, keyText); Mutation m = new Mutation(keyText); m.put(cf, cqBegin, statementValue); @@ -270,29 +309,29 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** - * Index a new interval - * Make indexes that handle this expression: - * hash( s? p? ) ?o + * Index a new instant + * Make indexes that handle this expression: + * hash( s? p? ) ?o * == o union hash(s)o union hash(p)o union hash(sp)o - * + * * @param writer * @param cv * @param instant * @throws MutationsRejectedException */ public void addInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { - KeyParts keyParts = new KeyParts(statement, instant); - for (KeyParts k: keyParts) { - Mutation m = new Mutation(k.getStoreKey()); - m.put(k.cf, k.cq,k.getValue()); - writer.addMutation(m); - } + KeyParts keyParts = new KeyParts(statement, instant); + for (KeyParts k : keyParts) { + Mutation m = new Mutation(k.getStoreKey()); + m.put(k.cf, k.cq,k.getValue()); + writer.addMutation(m); + } } /** * creates a scanner and handles all the throwables and nulls. - * + * * @param scanner * @return * @throws IOException @@ -364,10 +403,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public Range getRange(KeyParts keyParts) { Text start= null; - if (keyParts.constraintPrefix != null ) // Yes, has constraints + if (keyParts.constraintPrefix != null ) // Yes, has constraints start = keyParts.constraintPrefix; // <-- start specific logic else - start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); + start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); Text endAt = keyParts.getQueryKey(); // <-- end specific logic //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); return new Range(start, true, endAt, false); @@ -376,7 +415,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements ScannerBase scanner = query.doQuery(queryInstant, constraints); return getContextIteratorWrapper(scanner, constraints.getContext()); } - + /** * get statements where the date object is after the given queryInstant. */ @@ -464,7 +503,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** * Get intervals stored in the repository matching the given interval. - * Indexing Intervals will probably change or be removed. + * Indexing Intervals will probably change or be removed. * Currently predicate and subject constraints are filtered on the client. */ @Override @@ -492,7 +531,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** * find intervals stored in the repository before the given Interval. Find interval endings that are * before the given beginning. - * Indexing Intervals will probably change or be removed. + * Indexing Intervals will probably change or be removed. * Currently predicate and subject constraints are filtered on the client. */ @Override @@ -515,20 +554,20 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** * Interval after given interval. Find intervals that begin after the endings of the given interval. * Use the special following prefix mechanism to avoid matching the beginning date. - * Indexing Intervals will probably change or be removed. + * Indexing Intervals will probably change or be removed. * Currently predicate and subject and context constraints are filtered on the client. */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( TemporalInterval queryInterval, StatementContraints constraints) throws QueryEvaluationException { - + Scanner scanner = getScanner(); if (scanner != null) { // get rows where the start date is greater than the queryInterval.getEnd() Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true); scanner.setRange(range); - + if (constraints.hasContext()) scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); else @@ -540,14 +579,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements // -- // -- END of Query functions. Next up, general stuff used by the queries above. // -- - + /** * Allows passing range specific logic into doQuery. * Each query function implements an anonymous instance of this and calls it's doQuery(). */ abstract class Query { abstract protected Range getRange(KeyParts keyParts); - + public ScannerBase doQuery(TemporalInstant queryInstant, StatementContraints constraints) throws QueryEvaluationException { // key is contraintPrefix + time, or just time. // Any constraints handled here, if the constraints are empty, the @@ -558,7 +597,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements scanner = getBatchScanner(); else scanner = getScanner(); - + Collection<Range> ranges = new HashSet<Range>(); KeyParts lastKeyParts = null; Range range = null; @@ -579,7 +618,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** * An iteration wrapper for a loaded scanner that is returned for each query above. - * + * * @param scanner * the results to iterate, then close. * @return an anonymous object that will iterate the resulting statements from a given scanner. @@ -623,14 +662,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements }; } - + /** * An iteration wrapper for a loaded scanner that is returned for partially supported interval queries above. - * + * * @param scanner the results to iterate, then close. * @param constraints limit statements returned by next() to those matching the constraints. * @return an anonymous object that will iterate the resulting statements from a given scanner. - * @throws QueryEvaluationException + * @throws QueryEvaluationException */ private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementContraints constraints) { if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) @@ -645,11 +684,11 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements /** * An iteration wrapper for a loaded scanner that is returned for queries above. * Currently, this temporal index supports contexts only on the client, using this filter. - * + * * @param scanner the results to iterate, then close. * @param constraints limit statements returned by next() to those matching the constraints. * @return an anonymous object that will iterate the resulting statements from a given scanner. - * @throws QueryEvaluationException + * @throws QueryEvaluationException */ private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) { if (context==null) @@ -671,7 +710,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements private boolean isInitialized = false; final private Iterator<Entry<Key, Value>> i; final private ScannerBase scanner; - + ConstrainedIteratorWrapper(ScannerBase scanner) { this.scanner = scanner; i=scanner.iterator(); @@ -698,7 +737,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } /** - * Gets the next statement meeting constraints and stores in nextStatement. + * Gets the next statement meeting constraints and stores in nextStatement. * Sets null when all done, or on exception. * @throws QueryEvaluationException */ @@ -727,7 +766,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } } public abstract boolean allowedBy(Statement s); - + @Override public void remove() { throw new UnsupportedOperationException("Remove not implemented"); @@ -751,15 +790,15 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;} //return false; - if (! allowedByContext(statement, constraints.getContext())) + if (! allowedByContext(statement, constraints.getContext())) return false; //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;} - + if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate())) return false; //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;} - - System.out.println("allow statement: "+ statement.toString()); + + System.out.println("allow statement: "+ statement.toString()); return true; } @@ -812,13 +851,42 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements throw new IOException(msg, e); } } - - + + @Override public String getTableName() { return ConfigUtils.getTemporalTableName(conf); } - + private void deleteStatement(Statement statement) throws IOException, IllegalArgumentException { + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + if (!isValidPredicate || !(statement.getObject() instanceof Literal)) + return; + DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval + extractDateTime(statement, indexDateTimes); + if (indexDateTimes[0] == null) { + return; + } + + // Remove this as an instant, or interval. + try { + if (indexDateTimes[1] != null) { + TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); + removeInterval(temporalIndexBatchWriter, interval, statement); + } else { + TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); + removeInstant(temporalIndexBatchWriter, instant, statement); + } + } catch (MutationsRejectedException e) { + throw new IOException("While adding interval/instant for statement =" + statement, e); + } + } + + @Override + public void deleteStatement(RyaStatement statement) throws IllegalArgumentException, IOException { + deleteStatement(RyaToRdfConversions.convertStatement(statement)); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e5e227c1/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java index a0a3a03..c6bd9c2 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java @@ -1,5 +1,30 @@ package mvm.rya.indexing.accumulo.freetext; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDFS; + +import com.google.common.collect.Sets; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -22,11 +47,6 @@ package mvm.rya.indexing.accumulo.freetext; import info.aduna.iteration.CloseableIteration; - -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; - import junit.framework.Assert; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaType; @@ -36,27 +56,6 @@ import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.StatementContraints; import mvm.rya.indexing.accumulo.ConfigUtils; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.hadoop.conf.Configuration; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.RDFS; - -import com.google.common.collect.Sets; - public class AccumuloFreeTextIndexerTest { private static final StatementContraints EMPTY_CONSTRAINTS = new StatementContraints(); @@ -90,107 +89,153 @@ public class AccumuloFreeTextIndexerTest { @Test public void testSearch() throws Exception { - - AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer(); - f.setConf(conf); + try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { + f.setConf(conf); + + ValueFactory vf = new ValueFactoryImpl(); + + URI subject = new URIImpl("foo:subj"); + URI predicate = RDFS.LABEL; + Value object = vf.createLiteral("this is a new hat"); + + URI context = new URIImpl("foo:context"); - ValueFactory vf = new ValueFactoryImpl(); + Statement statement = vf.createStatement(subject, predicate, object, context); + f.storeStatement(RdfToRyaConversions.convertStatement(statement)); + f.flush(); - URI subject = new URIImpl("foo:subj"); - URI predicate = RDFS.LABEL; - Value object = vf.createLiteral("this is a new hat"); + printTables(conf); - URI context = new URIImpl("foo:context"); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("asdf", EMPTY_CONSTRAINTS))); - Statement statement = vf.createStatement(subject, predicate, object, context); - f.storeStatement(RdfToRyaConversions.convertStatement(statement)); - f.flush(); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this & !is", EMPTY_CONSTRAINTS))); - printTables(conf); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("this", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("is", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("a", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("new", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("asdf", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("ha*", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("*at", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this & !is", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat & new", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("this", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("is", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("a", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("new", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("this & hat & new", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("ha*", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("*at", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("bat", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this & bat", EMPTY_CONSTRAINTS))); + } + } + + @Test + public void testDelete() throws Exception { + try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { + f.setConf(conf); + + ValueFactory vf = new ValueFactoryImpl(); + + URI subject1 = new URIImpl("foo:subj"); + URI predicate1 = RDFS.LABEL; + Value object1 = vf.createLiteral("this is a new hat"); + + URI context1 = new URIImpl("foo:context"); + + Statement statement1 = vf.createStatement(subject1, predicate1, object1, context1); + f.storeStatement(RdfToRyaConversions.convertStatement(statement1)); + + URI subject2 = new URIImpl("foo:subject"); + URI predicate2 = RDFS.LABEL; + Value object2 = vf.createLiteral("Do you like my new hat?"); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat & new", EMPTY_CONSTRAINTS))); + URI context2 = new URIImpl("foo:context"); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("this & hat & new", EMPTY_CONSTRAINTS))); + Statement statement2 = vf.createStatement(subject2, predicate2, object2, context2); + f.storeStatement(RdfToRyaConversions.convertStatement(statement2)); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("bat", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this & bat", EMPTY_CONSTRAINTS))); + f.flush(); - f.close(); + + System.out.println("testDelete: BEFORE DELETE"); + printTables(conf); + + f.deleteStatement(RdfToRyaConversions.convertStatement(statement1)); + System.out.println("testDelete: AFTER FIRST DELETION"); + printTables(conf); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this is a new hat", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement2), getSet(f.queryText("Do you like my new hat?", EMPTY_CONSTRAINTS))); + + // Check that "new" didn't get deleted from the term table after "this is a new hat" + // was deleted since "new" is still in "Do you like my new hat?" + Assert.assertEquals(Sets.newHashSet(statement2), getSet(f.queryText("new", EMPTY_CONSTRAINTS))); + + f.deleteStatement(RdfToRyaConversions.convertStatement(statement2)); + System.out.println("testDelete: AFTER LAST DELETION"); + printTables(conf); + + System.out.println("testDelete: DONE"); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("this is a new hat", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("Do you like my new hat?", EMPTY_CONSTRAINTS))); + } } @Test public void testRestrictPredicatesSearch() throws Exception { conf.setStrings(ConfigUtils.FREETEXT_PREDICATES_LIST, "pred:1,pred:2"); - - AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer(); - f.setConf(conf); - // These should not be stored because they are not in the predicate list - f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid"))); - f.storeStatement(new RyaStatement(new RyaURI("foo:subj2"), new RyaURI(RDFS.COMMENT.toString()), new RyaType("invalid"))); + try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { + f.setConf(conf); - RyaURI pred1 = new RyaURI("pred:1"); - RyaURI pred2 = new RyaURI("pred:2"); + // These should not be stored because they are not in the predicate list + f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid"))); + f.storeStatement(new RyaStatement(new RyaURI("foo:subj2"), new RyaURI(RDFS.COMMENT.toString()), new RyaType("invalid"))); - // These should be stored because they are in the predicate list - RyaStatement s3 = new RyaStatement(new RyaURI("foo:subj3"), pred1, new RyaType("valid")); - RyaStatement s4 = new RyaStatement(new RyaURI("foo:subj4"), pred2, new RyaType("valid")); - f.storeStatement(s3); - f.storeStatement(s4); + RyaURI pred1 = new RyaURI("pred:1"); + RyaURI pred2 = new RyaURI("pred:2"); - // This should not be stored because the object is not a literal - f.storeStatement(new RyaStatement(new RyaURI("foo:subj5"), pred1, new RyaURI("in:valid"))); + // These should be stored because they are in the predicate list + RyaStatement s3 = new RyaStatement(new RyaURI("foo:subj3"), pred1, new RyaType("valid")); + RyaStatement s4 = new RyaStatement(new RyaURI("foo:subj4"), pred2, new RyaType("valid")); + f.storeStatement(s3); + f.storeStatement(s4); - f.flush(); + // This should not be stored because the object is not a literal + f.storeStatement(new RyaStatement(new RyaURI("foo:subj5"), pred1, new RyaURI("in:valid"))); - printTables(conf); + f.flush(); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("invalid", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("in:valid", EMPTY_CONSTRAINTS))); + printTables(conf); - Set<Statement> actual = getSet(f.queryText("valid", EMPTY_CONSTRAINTS)); - Assert.assertEquals(2, actual.size()); - Assert.assertTrue(actual.contains(RyaToRdfConversions.convertStatement(s3))); - Assert.assertTrue(actual.contains(RyaToRdfConversions.convertStatement(s4))); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("invalid", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(), getSet(f.queryText("in:valid", EMPTY_CONSTRAINTS))); - f.close(); + Set<Statement> actual = getSet(f.queryText("valid", EMPTY_CONSTRAINTS)); + Assert.assertEquals(2, actual.size()); + Assert.assertTrue(actual.contains(RyaToRdfConversions.convertStatement(s3))); + Assert.assertTrue(actual.contains(RyaToRdfConversions.convertStatement(s4))); + } } @Test public void testContextSearch() throws Exception { - - AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer(); - f.setConf(conf); - - ValueFactory vf = new ValueFactoryImpl(); - URI subject = new URIImpl("foo:subj"); - URI predicate = new URIImpl(RDFS.COMMENT.toString()); - Value object = vf.createLiteral("this is a new hat"); - URI context = new URIImpl("foo:context"); - - Statement statement = vf.createStatement(subject, predicate, object, context); - f.storeStatement(RdfToRyaConversions.convertStatement(statement)); - f.flush(); - - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", EMPTY_CONSTRAINTS))); - Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", new StatementContraints().setContext(context)))); - Assert.assertEquals(Sets.newHashSet(), - getSet(f.queryText("hat", new StatementContraints().setContext(vf.createURI("foo:context2"))))); - - f.close(); + try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { + f.setConf(conf); + + ValueFactory vf = new ValueFactoryImpl(); + URI subject = new URIImpl("foo:subj"); + URI predicate = new URIImpl(RDFS.COMMENT.toString()); + Value object = vf.createLiteral("this is a new hat"); + URI context = new URIImpl("foo:context"); + + Statement statement = vf.createStatement(subject, predicate, object, context); + f.storeStatement(RdfToRyaConversions.convertStatement(statement)); + f.flush(); + + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", EMPTY_CONSTRAINTS))); + Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryText("hat", new StatementContraints().setContext(context)))); + Assert.assertEquals(Sets.newHashSet(), + getSet(f.queryText("hat", new StatementContraints().setContext(vf.createURI("foo:context2"))))); + } } public static void printTables(Configuration conf) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -201,7 +246,7 @@ public class AccumuloFreeTextIndexerTest { for (String table : tops.list()) { System.out.println("Reading : " + table); System.out.format(FORMAT, "--Row--", "--ColumnFamily--", "--ColumnQualifier--", "--Value--"); - Scanner s = ConfigUtils.getConnector(conf).createScanner(table, Constants.NO_AUTHS); + Scanner s = ConfigUtils.getConnector(conf).createScanner(table, Authorizations.EMPTY); for (Entry<Key, org.apache.accumulo.core.data.Value> entry : s) { Key k = entry.getKey(); System.out.format(FORMAT, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), entry.getValue());
