RYA-324, RYA-272 Geo refactoring and examples closes #182
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/9e76b8d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/9e76b8d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/9e76b8d7 Branch: refs/heads/master Commit: 9e76b8d7c716d66192bd2ebe1eb7722c5cd079e1 Parents: d47190b Author: David Lotts <[email protected]> Authored: Wed Jun 14 17:50:11 2017 -0400 Committer: David Lotts <[email protected]> Committed: Wed Aug 30 16:28:37 2017 -0400 ---------------------------------------------------------------------- .../api/persist/index/RyaSecondaryIndexer.java | 5 +- .../accumulo/experimental/AccumuloIndexer.java | 1 - extras/pom.xml | 1 + extras/rya.geoindexing/geo.common/pom.xml | 25 + .../GeoEnabledFilterFunctionOptimizer.java | 353 ++++++++++ .../org/apache/rya/indexing/GeoIndexer.java | 210 ++++++ .../org/apache/rya/indexing/GeoIndexerType.java | 76 +++ .../rya/indexing/GeoIndexingTestUtils.java | 51 ++ .../apache/rya/indexing/GeoRyaSailFactory.java | 151 +++++ .../rya/indexing/GeoTemporalIndexerType.java | 55 ++ .../indexing/accumulo/geo/GeoParseUtils.java | 172 +++++ .../rya/indexing/accumulo/geo/GeoTupleSet.java | 498 ++++++++++++++ .../accumulo/geo/OptionalConfigUtils.java | 149 +++++ .../GeoTemporalExternalSetMatcherFactory.java | 44 ++ .../geotemporal/GeoTemporalIndexException.java | 57 ++ .../GeoTemporalIndexSetProvider.java | 239 +++++++ .../geotemporal/GeoTemporalIndexer.java | 197 ++++++ .../geotemporal/GeoTemporalIndexerFactory.java | 57 ++ .../geotemporal/GeoTemporalOptimizer.java | 69 ++ .../GeoTemporalToSegmentConverter.java | 51 ++ .../rya/indexing/geotemporal/model/Event.java | 218 ++++++ .../geotemporal/model/EventQueryNode.java | 372 +++++++++++ .../geotemporal/storage/EventStorage.java | 130 ++++ extras/rya.geoindexing/geo.geomesa/pom.xml | 51 ++ .../accumulo/geo/GeoMesaGeoIndexer.java | 519 ++++++++++++++ .../rya/indexing/accumulo/geo/GmlParser.java | 48 ++ .../geoExamples/RyaGeoDirectExample.java | 404 +++++++++++ .../indexing/accumulo/geo/GeoIndexerSfTest.java | 520 +++++++++++++++ .../indexing/accumulo/geo/GeoIndexerTest.java | 395 +++++++++++ extras/rya.geoindexing/geo.geowave/pom.xml | 61 ++ .../accumulo/geo/GeoWaveGeoIndexer.java | 668 +++++++++++++++++++ .../rya/indexing/accumulo/geo/GmlParser.java | 48 ++ .../geoExamples/GeowaveDirectExample.java | 445 ++++++++++++ .../accumulo/geo/GeoWaveFeatureReaderTest.java | 384 +++++++++++ .../accumulo/geo/GeoWaveGTQueryTest.java | 254 +++++++ .../accumulo/geo/GeoWaveIndexerSfTest.java | 536 +++++++++++++++ .../accumulo/geo/GeoWaveIndexerTest.java | 447 +++++++++++++ extras/rya.geoindexing/geo.mongo/pom.xml | 41 ++ .../geoExamples/RyaMongoGeoDirectExample.java | 238 +++++++ .../mongo/EventDocumentConverter.java | 171 +++++ .../geotemporal/mongo/EventUpdater.java | 85 +++ .../GeoTemporalMongoDBStorageStrategy.java | 300 +++++++++ .../geotemporal/mongo/MongoEventStorage.java | 195 ++++++ .../mongo/MongoGeoTemporalIndexer.java | 227 +++++++ .../mongodb/geo/GeoMongoDBStorageStrategy.java | 247 +++++++ .../rya/indexing/mongodb/geo/GmlParser.java | 48 ++ .../indexing/mongodb/geo/MongoGeoIndexer.java | 154 +++++ .../indexing/mongodb/geo/MongoGeoTupleSet.java | 361 ++++++++++ .../geotemporal/GeoTemporalProviderTest.java | 222 ++++++ .../geotemporal/GeoTemporalTestBase.java | 140 ++++ .../geotemporal/MongoGeoTemporalIndexIT.java | 176 +++++ .../geotemporal/model/EventQueryNodeTest.java | 362 ++++++++++ .../mongo/EventDocumentConverterTest.java | 64 ++ .../GeoTemporalMongoDBStorageStrategyTest.java | 490 ++++++++++++++ .../mongo/MongoEventStorageTest.java | 197 ++++++ .../mongo/MongoGeoTemporalIndexerIT.java | 115 ++++ .../indexing/geotemporal/mongo/MongoITBase.java | 64 ++ .../indexing/mongo/MongoGeoIndexerSfTest.java | 262 ++++++++ .../rya/indexing/mongo/MongoGeoIndexerTest.java | 370 ++++++++++ extras/rya.geoindexing/pom.xml | 271 ++++---- .../GeoEnabledFilterFunctionOptimizer.java | 332 --------- .../org/apache/rya/indexing/GeoIndexer.java | 210 ------ .../org/apache/rya/indexing/GeoIndexerType.java | 61 -- .../apache/rya/indexing/GeoRyaSailFactory.java | 150 ----- .../rya/indexing/OptionalConfigUtils.java | 140 ---- .../accumulo/geo/GeoMesaGeoIndexer.java | 520 --------------- .../indexing/accumulo/geo/GeoParseUtils.java | 148 ---- .../rya/indexing/accumulo/geo/GeoTupleSet.java | 499 -------------- .../accumulo/geo/GeoWaveGeoIndexer.java | 668 ------------------- .../GeoTemporalExternalSetMatcherFactory.java | 44 -- .../geotemporal/GeoTemporalIndexException.java | 57 -- .../GeoTemporalIndexSetProvider.java | 239 ------- .../geotemporal/GeoTemporalIndexer.java | 193 ------ .../geotemporal/GeoTemporalIndexerFactory.java | 53 -- .../geotemporal/GeoTemporalOptimizer.java | 69 -- .../GeoTemporalToSegmentConverter.java | 51 -- .../rya/indexing/geotemporal/model/Event.java | 218 ------ .../geotemporal/model/EventQueryNode.java | 372 ----------- .../mongo/EventDocumentConverter.java | 171 ----- .../geotemporal/mongo/EventUpdater.java | 85 --- .../GeoTemporalMongoDBStorageStrategy.java | 299 --------- .../geotemporal/mongo/MongoEventStorage.java | 195 ------ .../mongo/MongoGeoTemporalIndexer.java | 226 ------- .../geotemporal/storage/EventStorage.java | 130 ---- .../mongodb/geo/GeoMongoDBStorageStrategy.java | 247 ------- .../indexing/mongodb/geo/MongoGeoIndexer.java | 154 ----- .../indexing/mongodb/geo/MongoGeoTupleSet.java | 361 ---------- .../rya/indexing/GeoIndexingTestUtils.java | 51 -- .../indexing/accumulo/geo/GeoIndexerSfTest.java | 521 --------------- .../indexing/accumulo/geo/GeoIndexerTest.java | 396 ----------- .../accumulo/geo/GeoWaveFeatureReaderTest.java | 385 ----------- .../accumulo/geo/GeoWaveGTQueryTest.java | 254 ------- .../accumulo/geo/GeoWaveIndexerSfTest.java | 537 --------------- .../accumulo/geo/GeoWaveIndexerTest.java | 448 ------------- .../geotemporal/GeoTemporalProviderTest.java | 222 ------ .../geotemporal/GeoTemporalTestBase.java | 140 ---- .../geotemporal/MongoGeoTemporalIndexIT.java | 176 ----- .../geotemporal/model/EventQueryNodeTest.java | 362 ---------- .../mongo/EventDocumentConverterTest.java | 64 -- .../GeoTemporalMongoDBStorageStrategyTest.java | 490 -------------- .../mongo/MongoEventStorageTest.java | 197 ------ .../mongo/MongoGeoTemporalIndexerIT.java | 115 ---- .../indexing/geotemporal/mongo/MongoITBase.java | 64 -- .../indexing/mongo/MongoGeoIndexerSfTest.java | 262 -------- .../rya/indexing/mongo/MongoGeoIndexerTest.java | 370 ---------- pom.xml | 1 + 106 files changed, 12617 insertions(+), 11091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java b/common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java index e1c1819..ef21f1f 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java @@ -33,7 +33,10 @@ import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; public interface RyaSecondaryIndexer extends Closeable, Flushable, Configurable { - + /** + * initialize after setting configuration. + */ + public void init(); /** * Returns the table name if the implementation supports it. * Note that some indexers use multiple tables, this only returns one. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java index 3e08ef4..4a164a9 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java @@ -29,7 +29,6 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.persist.index.RyaSecondaryIndexer; public interface AccumuloIndexer extends RyaSecondaryIndexer { - public void init(); public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException; public void setConnector(Connector connector); public void destroy(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index 8823031..82930bc 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -45,6 +45,7 @@ under the License. <module>rya.merger</module> <module>rya.giraph</module> <module>rya.benchmark</module> + <module>rya.geoindexing</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/pom.xml b/extras/rya.geoindexing/geo.common/pom.xml new file mode 100644 index 0000000..6b4b3ca --- /dev/null +++ b/extras/rya.geoindexing/geo.common/pom.xml @@ -0,0 +1,25 @@ +<?xml version='1.0'?> + +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.geoindexing</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>geo.common</artifactId> + <name>Apache Rya Geo Indexing Common Code</name> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java new file mode 100644 index 0000000..6ad0edc --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java @@ -0,0 +1,353 @@ +package org.apache.rya.indexing; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.commons.lang.Validate; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; +import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils; +import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.And; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.collect.Lists; + +public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Configurable { + private static final Logger LOG = Logger.getLogger(GeoEnabledFilterFunctionOptimizer.class); + private final ValueFactory valueFactory = new ValueFactoryImpl(); + + private Configuration conf; + private GeoIndexer geoIndexer; + private FreeTextIndexer freeTextIndexer; + private TemporalIndexer temporalIndexer; + private boolean init = false; + + public GeoEnabledFilterFunctionOptimizer() { + } + + public GeoEnabledFilterFunctionOptimizer(final AccumuloRdfConfiguration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, IOException, TableExistsException, NumberFormatException, UnknownHostException { + this.conf = conf; + init(); + } + + //setConf initializes FilterFunctionOptimizer so reflection can be used + //to create optimizer in RdfCloudTripleStoreConnection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + //reset the init. + init = false; + init(); + } + /** + * Load instances of the selected indexers. This is tricky because some (geomesa vs geowave) have incompatible dependencies (geotools versions). + */ + private synchronized void init() { + if (!init) { + if (ConfigUtils.getUseMongo(conf)) { + // create a new MongoGeoIndexer() without having it at compile time. + geoIndexer = instantiate(GeoIndexerType.MONGO_DB.getGeoIndexerClassString(), GeoIndexer.class); + geoIndexer.setConf(conf); + freeTextIndexer = new MongoFreeTextIndexer(); + freeTextIndexer.setConf(conf); + temporalIndexer = new MongoTemporalIndexer(); + temporalIndexer.setConf(conf); + } else { + GeoIndexerType geoIndexerType = OptionalConfigUtils.getGeoIndexerType(conf); + if (geoIndexerType == GeoIndexerType.UNSPECIFIED) { + geoIndexer = instantiate(GeoIndexerType.GEO_MESA.getGeoIndexerClassString(), GeoIndexer.class); + } else { + geoIndexer = instantiate(geoIndexerType.getGeoIndexerClassString(), GeoIndexer.class); + } + geoIndexer.setConf(conf); + freeTextIndexer = new AccumuloFreeTextIndexer(); + freeTextIndexer.setConf(conf); + temporalIndexer = new AccumuloTemporalIndexer(); + temporalIndexer.setConf(conf); + } + init = true; + } + } + + + @Override + public void optimize(final TupleExpr tupleExpr, final Dataset dataset, final BindingSet bindings) { + // find variables used in property and resource based searches: + final SearchVarVisitor searchVars = new SearchVarVisitor(); + tupleExpr.visit(searchVars); + // rewrites for property searches: + processPropertySearches(tupleExpr, searchVars.searchProperties); + + } + + /** + * helper to instantiate a class from a string class name. + * @param className name of class to instantiate. + * @param type base interface that the class immplements + * @return the instance. + */ + public static <T> T instantiate(final String className, final Class<T> type){ + try{ + return type.cast(Class.forName(className).newInstance()); + } catch(InstantiationException + | IllegalAccessException + | ClassNotFoundException e){ + throw new IllegalStateException(e); + } + } + + private void processPropertySearches(final TupleExpr tupleExpr, final Collection<Var> searchProperties) { + final MatchStatementVisitor matchStatements = new MatchStatementVisitor(searchProperties); + tupleExpr.visit(matchStatements); + for (final StatementPattern matchStatement: matchStatements.matchStatements) { + final Var subject = matchStatement.getSubjectVar(); + if (subject.hasValue() && !(subject.getValue() instanceof Resource)) { + throw new IllegalArgumentException("Query error: Found " + subject.getValue() + ", expected an URI or BNode"); + } + Validate.isTrue(subject.hasValue() || subject.getName() != null); + Validate.isTrue(!matchStatement.getObjectVar().hasValue() && matchStatement.getObjectVar().getName() != null); + buildQuery(tupleExpr, matchStatement); + } + } + + private void buildQuery(final TupleExpr tupleExpr, final StatementPattern matchStatement) { + //If our IndexerExpr (to be) is the rhs-child of LeftJoin, we can safely make that a Join: + // the IndexerExpr will (currently) not return results that can deliver unbound variables. + //This optimization should probably be generalized into a LeftJoin -> Join optimizer under certain conditions. Until that + // has been done, this code path at least takes care of queries generated by OpenSahara SparqTool that filter on OPTIONAL + // projections. E.g. summary~'full text search' (summary is optional). See #379 + if (matchStatement.getParentNode() instanceof LeftJoin) { + final LeftJoin leftJoin = (LeftJoin)matchStatement.getParentNode(); + if (leftJoin.getRightArg() == matchStatement && leftJoin.getCondition() == null) { + matchStatement.getParentNode().replaceWith(new Join(leftJoin.getLeftArg(), leftJoin.getRightArg())); + } + } + final FilterFunction fVisitor = new FilterFunction(matchStatement.getObjectVar().getName()); + tupleExpr.visit(fVisitor); + final List<IndexingExpr> results = Lists.newArrayList(); + for(int i = 0; i < fVisitor.func.size(); i++){ + results.add(new IndexingExpr(fVisitor.func.get(i), matchStatement, fVisitor.args.get(i))); + } + removeMatchedPattern(tupleExpr, matchStatement, new IndexerExprReplacer(results)); + } + + //find vars contained in filters + private static class SearchVarVisitor extends QueryModelVisitorBase<RuntimeException> { + private final Collection<Var> searchProperties = new ArrayList<Var>(); + + @Override + public void meet(final FunctionCall fn) { + final URI fun = new URIImpl(fn.getURI()); + final Var result = IndexingFunctionRegistry.getResultVarFromFunctionCall(fun, fn.getArgs()); + if (result != null && !searchProperties.contains(result)) { + searchProperties.add(result); + } + } + } + + //find StatementPatterns containing filter variables + private static class MatchStatementVisitor extends QueryModelVisitorBase<RuntimeException> { + private final Collection<Var> propertyVars; + private final Collection<Var> usedVars = new ArrayList<Var>(); + private final List<StatementPattern> matchStatements = new ArrayList<StatementPattern>(); + + public MatchStatementVisitor(final Collection<Var> propertyVars) { + this.propertyVars = propertyVars; + } + + @Override public void meet(final StatementPattern statement) { + final Var object = statement.getObjectVar(); + if (propertyVars.contains(object)) { + if (usedVars.contains(object)) { + throw new IllegalArgumentException("Illegal search, variable is used multiple times as object: " + object.getName()); + } else { + usedVars.add(object); + matchStatements.add(statement); + } + } + } + } + + private abstract class AbstractEnhanceVisitor extends QueryModelVisitorBase<RuntimeException> { + final String matchVar; + List<URI> func = Lists.newArrayList(); + List<Object[]> args = Lists.newArrayList(); + + public AbstractEnhanceVisitor(final String matchVar) { + this.matchVar = matchVar; + } + + protected void addFilter(final URI uri, final Object[] values) { + func.add(uri); + args.add(values); + } + } + + //create indexing expression for each filter matching var in filter StatementPattern + //replace old filter condition with true condition + private class FilterFunction extends AbstractEnhanceVisitor { + public FilterFunction(final String matchVar) { + super(matchVar); + } + + @Override + public void meet(final FunctionCall call) { + final URI fnUri = valueFactory.createURI(call.getURI()); + final Var resultVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(fnUri, call.getArgs()); + if (resultVar != null && resultVar.getName().equals(matchVar)) { + addFilter(valueFactory.createURI(call.getURI()), GeoParseUtils.extractArguments(matchVar, call)); + if (call.getParentNode() instanceof Filter || call.getParentNode() instanceof And || call.getParentNode() instanceof LeftJoin) { + call.replaceWith(new ValueConstant(valueFactory.createLiteral(true))); + } else { + throw new IllegalArgumentException("Query error: Found " + call + " as part of an expression that is too complex"); + } + } + } + + @Override + public void meet(final Filter filter) { + //First visit children, then condition (reverse of default): + filter.getArg().visit(this); + filter.getCondition().visit(this); + } + } + + private void removeMatchedPattern(final TupleExpr tupleExpr, final StatementPattern pattern, final TupleExprReplacer replacer) { + final List<TupleExpr> indexTuples = replacer.createReplacement(pattern); + if (indexTuples.size() > 1) { + final VarExchangeVisitor vev = new VarExchangeVisitor(pattern); + tupleExpr.visit(vev); + Join join = new Join(indexTuples.remove(0), indexTuples.remove(0)); + for (final TupleExpr geo : indexTuples) { + join = new Join(join, geo); + } + pattern.replaceWith(join); + } else if (indexTuples.size() == 1) { + pattern.replaceWith(indexTuples.get(0)); + pattern.setParentNode(null); + } else { + throw new IllegalStateException("Must have at least one replacement for matched StatementPattern."); + } + } + + private interface TupleExprReplacer { + List<TupleExpr> createReplacement(TupleExpr org); + } + + //replace each filter pertinent StatementPattern with corresponding index expr + private class IndexerExprReplacer implements TupleExprReplacer { + private final List<IndexingExpr> indxExpr; + private final FUNCTION_TYPE type; + + public IndexerExprReplacer(final List<IndexingExpr> indxExpr) { + this.indxExpr = indxExpr; + final URI func = indxExpr.get(0).getFunction(); + type = IndexingFunctionRegistry.getFunctionType(func); + } + + @Override + public List<TupleExpr> createReplacement(final TupleExpr org) { + final List<TupleExpr> indexTuples = Lists.newArrayList(); + switch (type) { + case GEO: + for (final IndexingExpr indx : indxExpr) { + indexTuples.add(new GeoTupleSet(indx, geoIndexer)); + } + break; + case FREETEXT: + for (final IndexingExpr indx : indxExpr) { + indexTuples.add(new FreeTextTupleSet(indx, freeTextIndexer)); + } + break; + case TEMPORAL: + for (final IndexingExpr indx : indxExpr) { + indexTuples.add(new TemporalTupleSet(indx, temporalIndexer)); + } + break; + default: + throw new IllegalArgumentException("Incorrect type!"); + } + return indexTuples; + } + } + + private static class VarExchangeVisitor extends QueryModelVisitorBase<RuntimeException> { + private final StatementPattern exchangeVar; + public VarExchangeVisitor(final StatementPattern sp) { + exchangeVar = sp; + } + + @Override + public void meet(final Join node) { + final QueryModelNode lNode = node.getLeftArg(); + if (lNode instanceof StatementPattern) { + exchangeVar.replaceWith(lNode); + node.setLeftArg(exchangeVar); + } else { + super.meet(node); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexer.java new file mode 100644 index 0000000..d091d32 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexer.java @@ -0,0 +1,210 @@ +package org.apache.rya.indexing; + +import org.openrdf.model.Statement; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import info.aduna.iteration.CloseableIteration; +import org.apache.rya.api.persist.index.RyaSecondaryIndexer; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; + +/** + * A repository to store, index, and retrieve {@link Statement}s based on geospatial features. + */ +public interface GeoIndexer extends RyaSecondaryIndexer { + /** + * Returns statements that contain a geometry that is equal to the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li> + * "Two geometries are topologically equal if their interiors intersect and no part of the interior or boundary of one geometry intersects the exterior of the other" + * <li>"A is equal to B if A is within B and A contains B" + * </ul> + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryEquals(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that is disjoint to the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>"A and B are disjoint if they have no point in common. They form a set of disconnected geometries." + * <li>"A and B are disjoint if A does not intersect B" + * </ul> + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that Intersects the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>"a intersects b: geometries a and b have at least one point in common." + * <li>"not Disjoint" + * </ul> + * + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryIntersects(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that Touches the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>"a touches b, they have at least one boundary point in common, but no interior points." + * </ul> + * + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryTouches(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that crosses the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li> + * "a crosses b, they have some but not all interior points in common (and the dimension of the intersection is less than that of at least one of them)." + * </ul> + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryCrosses(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that is Within the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>"a is within b, a lies in the interior of b" + * <li>Same as: "Contains(b,a)" + * </ul> + * + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryWithin(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that Contains the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>b is within a. Geometry b lies in the interior of a. Another definition: + * "a 'contains' b iff no points of b lie in the exterior of a, and at least one point of the interior of b lies in the interior of a" + * <li>Same: Within(b,a) + * </ul> + * + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryContains(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that Overlaps the queried {@link Geometry} and meet the {@link StatementConstraints}. + * + * <p> + * From Wikipedia (http://en.wikipedia.org/wiki/DE-9IM): + * <ul> + * <li>a crosses b, they have some but not all interior points in common (and the dimension of the intersection is less than that of at + * least one of them). + * </ul> + * + * + * @param query + * the queried geometry + * @param contraints + * the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(Geometry query, StatementConstraints contraints); + + /** + * Returns statements that contain a geometry that is near the queried {@link Geometry} and meet the {@link StatementConstraints}. + * <p> + * A geometry is considered near if it within the min/max distances specified in the provided {@link NearQuery}. This will make a disc (specify max), + * a donut(specify both), or a spheroid complement disc (specify min) + * <p> + * The distances are specified in meters and must be >= 0. + * <p> + * To specify max/min distances: + * <ul> + * <li>Enter parameters in order MAX, MIN -- Donut</li> + * <li>Omit the MIN -- Disc</li> + * <li>Enter 0 for MAX, and Enter parameter for MIN -- Spheroid complement Dist</li> + * <li>Omit both -- Default max/min [TODO: Find these values]</li> + * </ul> + * <p> + * Note: This query will not fail if the min is greater than the max, it will just return no results. + * + * @param query the queried geometry, with Optional min and max distance fields. + * @param contraints the {@link StatementConstraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryNear(NearQuery query, StatementConstraints contraints); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexerType.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexerType.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexerType.java new file mode 100644 index 0000000..9bb613b --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexerType.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import static com.google.common.base.Preconditions.checkNotNull; + + +/** + * A list of all the types of Geo indexers supported in Rya. + */ +public enum GeoIndexerType { + /** + * Geo Mesa based indexer. + */ + GEO_MESA("org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer"), + /** + * Geo Wave based indexer. + */ + GEO_WAVE("org.apache.rya.indexing.accumulo.geo.GeoWaveGeoIndexer"), + /** + * MongoDB based indexer. + */ + MONGO_DB("org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer"), + /** + * No mention of a type is specified, so use default. + */ + UNSPECIFIED("no_index_was_configured"); + + private String geoIndexerClassString; + + /** + * Creates a new {@link GeoIndexerType}. + * @param geoIndexerClass the {@link GeoIndexer} {@link Class}. + * (not {@code null}) + */ + private GeoIndexerType(final String geoIndexerClassString) { + this.geoIndexerClassString = checkNotNull(geoIndexerClassString); + } + + /** + * @return the {@link GeoIndexer} {@link Class}. (not {@code null}) + */ + public String getGeoIndexerClassString() { + + return geoIndexerClassString; + } + + /** + * @return True if the class exists on the classpath. + */ + public boolean isOnClassPath() { + try { + Class.forName(geoIndexerClassString, false, this.getClass().getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + // it does not exist on the classpath + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexingTestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexingTestUtils.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexingTestUtils.java new file mode 100644 index 0000000..b0c636d --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoIndexingTestUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import java.util.HashSet; +import java.util.Set; + +import info.aduna.iteration.CloseableIteration; + +/** + * Utility methods to help test geo indexing methods. + */ +public final class GeoIndexingTestUtils { + /** + * Private constructor to prevent instantiation. + */ + private GeoIndexingTestUtils () { + } + + /** + * Generates a set of items from the specified iterator. + * @param iter a {@link CloseableIteration}. + * @return the {@link Set} of items from the iterator or an empty set if + * none were found. + * @throws Exception + */ + public static <X> Set<X> getSet(final CloseableIteration<X, ?> iter) throws Exception { + final Set<X> set = new HashSet<X>(); + while (iter.hasNext()) { + final X item = iter.next(); + set.add(item); + } + return set; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java new file mode 100644 index 0000000..3c01bf6 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import static java.util.Objects.requireNonNull; + +import java.net.UnknownHostException; +import java.util.Objects; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.conf.Configuration; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.MongoClient; + +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsToConfiguration; +import org.apache.rya.api.layout.TablePrefixLayoutStrategy; +import org.apache.rya.api.persist.RyaDAO; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils; +import org.apache.rya.mongodb.MongoConnectorFactory; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.inference.InferenceEngine; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; + +public class GeoRyaSailFactory { + private static final Logger LOG = LoggerFactory.getLogger(GeoRyaSailFactory.class); + + /** + * Creates an instance of {@link Sail} that is attached to a Rya instance. + * + * @param conf - Configures how the Sail object will be constructed. (not null) + * @return A {@link Sail} object that is backed by a Rya datastore. + * @throws SailException The object could not be created. + */ + public static Sail getInstance(final Configuration conf) throws AccumuloException, + AccumuloSecurityException, RyaDAOException, InferenceEngineException, SailException { + requireNonNull(conf); + return getRyaSail(conf); + } + + private static Sail getRyaSail(final Configuration config) throws InferenceEngineException, RyaDAOException, AccumuloException, AccumuloSecurityException, SailException { + final RdfCloudTripleStore store = new RdfCloudTripleStore(); + final RyaDAO<?> dao; + final RdfCloudTripleStoreConfiguration rdfConfig; + + final String user; + final String pswd; + // XXX Should(?) be MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX inside the if below. RYA-135 + final String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + + if(ConfigUtils.getUseMongo(config)) { + final MongoDBRdfConfiguration mongoConfig = new MongoDBRdfConfiguration(config); + rdfConfig = mongoConfig; + final MongoClient client = MongoConnectorFactory.getMongoClient(config); + try { + final MongoRyaInstanceDetailsRepository ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, mongoConfig.getCollectionName()); + RyaDetailsToConfiguration.addRyaDetailsToConfiguration(ryaDetailsRepo.getRyaInstanceDetails(), mongoConfig); + } catch (final RyaDetailsRepositoryException e) { + LOG.info("Instance does not have a rya details collection, skipping."); + } + dao = getMongoDAO((MongoDBRdfConfiguration)rdfConfig, client); + } else { + rdfConfig = new AccumuloRdfConfiguration(config); + user = rdfConfig.get(ConfigUtils.CLOUDBASE_USER); + pswd = rdfConfig.get(ConfigUtils.CLOUDBASE_PASSWORD); + Objects.requireNonNull(user, "Accumulo user name is missing from configuration."+ConfigUtils.CLOUDBASE_USER); + Objects.requireNonNull(pswd, "Accumulo user password is missing from configuration."+ConfigUtils.CLOUDBASE_PASSWORD); + rdfConfig.setTableLayoutStrategy( new TablePrefixLayoutStrategy(ryaInstance) ); + RyaSailFactory.updateAccumuloConfig((AccumuloRdfConfiguration) rdfConfig, user, pswd, ryaInstance); + dao = getAccumuloDAO((AccumuloRdfConfiguration)rdfConfig); + } + store.setRyaDAO(dao); + rdfConfig.setTablePrefix(ryaInstance); + + if (rdfConfig.isInfer()){ + final InferenceEngine inferenceEngine = new InferenceEngine(); + inferenceEngine.setConf(rdfConfig); + inferenceEngine.setRyaDAO(dao); + inferenceEngine.init(); + store.setInferenceEngine(inferenceEngine); + } + + store.initialize(); + + return store; + } + + private static MongoDBRyaDAO getMongoDAO(final MongoDBRdfConfiguration config, final MongoClient client) throws RyaDAOException { + MongoDBRyaDAO dao = null; + OptionalConfigUtils.setIndexers(config); + if(client != null) { + dao = new MongoDBRyaDAO(config, client); + } else { + try { + dao = new MongoDBRyaDAO(config); + } catch (NumberFormatException | UnknownHostException e) { + throw new RyaDAOException("Unable to connect to mongo at the configured location.", e); + } + } + dao.init(); + return dao; + } + + private static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException { + final Connector connector = ConfigUtils.getConnector(config); + final AccumuloRyaDAO dao = new AccumuloRyaDAO(); + dao.setConnector(connector); + + OptionalConfigUtils.setIndexers(config); + config.setDisplayQueryPlan(true); + + dao.setConf(config); + dao.init(); + return dao; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoTemporalIndexerType.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoTemporalIndexerType.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoTemporalIndexerType.java new file mode 100644 index 0000000..311af93 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/GeoTemporalIndexerType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import static com.google.common.base.Preconditions.checkNotNull; + + +/** + * A list of all the types of Geo indexers supported in Rya. + */ +public enum GeoTemporalIndexerType { + /** + * MongoDB based GeoTemporal index and optimizer go together. + */ + MONGO_GEO_TEMPORAL("org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer"), + MONGO_GEO_TEMPORAL_OPTIMIZER("org.apache.rya.indexing.geotemporal.GeoTemporalOptimizer"), + /** + * No mention of a type is specified, so use default. + */ + UNSPECIFIED("no_index_was_configured"); + private String geoTemporalIndexerClassString; + + /** + * Creates a new {@link GeoTemporalIndexerType}. + * @param geoIndexerClass the {@link GeoIndexer} {@link Class}. + * (not {@code null}) + */ + private GeoTemporalIndexerType(final String geoIndexerClassString) { + this.geoTemporalIndexerClassString = checkNotNull(geoIndexerClassString); + } + + /** + * @return the {@link GeoIndexer} {@link Class}. (not {@code null}) + */ + public String getGeoTemporalIndexerClassString() { + return geoTemporalIndexerClassString; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java new file mode 100644 index 0000000..779a61e --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java @@ -0,0 +1,172 @@ +package org.apache.rya.indexing.accumulo.geo; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; + +import javax.xml.parsers.ParserConfigurationException; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.GeoConstants; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.xml.sax.SAXException; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +/** + * + * parsing RDF oriented gml and well known text (WKT) into a geometry + * This is abstract because of its depenendence on geo tools. + * Your implementation can use whatever version you like. + */ +public class GeoParseUtils { + static final Logger logger = Logger.getLogger(GeoParseUtils.class); + /** + * @deprecated Not needed since geo literals may be WKT or GML. + * + * This method warns on a condition that must already be tested. Replaced by + * {@link #getLiteral(Statement)} and {@link #getGeometry(Statement} + * and getLiteral(statement).toString() + * and getLiteral(statement).getDatatype() + */ + @Deprecated + public static String getWellKnownText(final Statement statement) throws ParseException { + final Literal lit = getLiteral(statement); + if (!GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { + logger.warn("Literal is not of type " + GeoConstants.XMLSCHEMA_OGC_WKT + ": " + statement.toString()); + } + return lit.getLabel().toString(); + } + + public static Literal getLiteral(final Statement statement) throws ParseException { + final org.openrdf.model.Value v = statement.getObject(); + if (!(v instanceof Literal)) { + throw new ParseException("Statement does not contain Literal: " + statement.toString()); + } + final Literal lit = (Literal) v; + return lit; + } + + /** + * Parse GML/wkt literal to Geometry + * + * @param statement + * @return + * @throws ParseException + * @throws ParserConfigurationException + * @throws SAXException + * @throws IOException + */ + public static Geometry getGeometry(final Statement statement, GmlToGeometryParser gmlToGeometryParser) throws ParseException { + // handle GML or WKT + final Literal lit = getLiteral(statement); + if (GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { + final String wkt = lit.getLabel().toString(); + return (new WKTReader()).read(wkt); + } else if (GeoConstants.XMLSCHEMA_OGC_GML.equals(lit.getDatatype())) { + final String gml = lit.getLabel().toString(); + try { + return getGeometryGml(gml, gmlToGeometryParser); + } catch (IOException | SAXException | ParserConfigurationException e) { + throw new ParseException(e); + } + } else { + throw new ParseException("Literal is unknown geo type, expecting WKT or GML: " + statement.toString()); + } + } + /** + * Convert GML/XML string into a geometry that can be indexed. + * @param gmlString + * @return + * @throws IOException + * @throws SAXException + * @throws ParserConfigurationException + */ + public static Geometry getGeometryGml(final String gmlString, final GmlToGeometryParser gmlToGeometryParser) throws IOException, SAXException, ParserConfigurationException { + final Reader reader = new StringReader(gmlString); + final Geometry geometry = gmlToGeometryParser.parse(reader); + // This sometimes gets populated with the SRS/CRS: geometry.getUserData() + // Always returns 0 : geometry.getSRID() + //TODO geometry.setUserData(some default CRS); OR geometry.setSRID(some default CRS) + + return geometry; + } + + + /** + * Extracts the arguments used in a {@link FunctionCall}. + * @param matchName - The variable name to match to arguments used in the {@link FunctionCall}. + * @param call - The {@link FunctionCall} to match against. + * @return - The {@link Value}s matched. + */ + public static Object[] extractArguments(final String matchName, final FunctionCall call) { + final Object[] args = new Object[call.getArgs().size() - 1]; + int argI = 0; + for (int i = 0; i != call.getArgs().size(); ++i) { + final ValueExpr arg = call.getArgs().get(i); + if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { + continue; + } + if (arg instanceof ValueConstant) { + args[argI] = ((ValueConstant)arg).getValue(); + } else if (arg instanceof Var && ((Var)arg).hasValue()) { + args[argI] = ((Var)arg).getValue(); + } else { + args[argI] = arg; + } + ++argI; + } + return args; + } + + /** + * Wrap the geotools or whatever parser. + */ + public interface GmlToGeometryParser { + /** + * Implemented code should look like this: + * import org.geotools.gml3.GMLConfiguration; + * import org.geotools.xml.Parser; + * final GmlToGeometryParser gmlParser = new GmlToGeometryParser(new GMLConfiguration()); return (Geometry) + * gmlParser.parse(reader); + * @param reader + * contains the gml to parse. use StringReader to adapt. + * @return a JTS geometry + * @throws ParserConfigurationException + * @throws SAXException + * @throws IOException + * + */ + public abstract Geometry parse(final Reader reader) throws IOException, SAXException, ParserConfigurationException; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java new file mode 100644 index 0000000..888c099 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java @@ -0,0 +1,498 @@ +package org.apache.rya.indexing.accumulo.geo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.IteratorFactory; +import org.apache.rya.indexing.SearchFunction; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +//Indexing Node for geo expressions to be inserted into execution plan +//to delegate geo portion of query to geo index +public class GeoTupleSet extends ExternalTupleSet { + private static final String NEAR_DELIM = "::"; + private final Configuration conf; + private final GeoIndexer geoIndexer; + private final IndexingExpr filterInfo; + + + public GeoTupleSet(final IndexingExpr filterInfo, final GeoIndexer geoIndexer) { + this.filterInfo = filterInfo; + this.geoIndexer = geoIndexer; + conf = geoIndexer.getConf(); + } + + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + @Override + public GeoTupleSet clone() { + return new GeoTupleSet(filterInfo, geoIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + @Override + public String getSignature() { + return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " "); + } + + + + @Override + public boolean equals(final Object other) { + if (other == this) { + return true; + } + if (!(other instanceof GeoTupleSet)) { + return false; + } + final GeoTupleSet arg = (GeoTupleSet) other; + return filterInfo.equals(arg.filterInfo); + } + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + + /** + * Returns an iterator over the result set of the contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) + throws QueryEvaluationException { + + final URI funcURI = filterInfo.getFunction(); + final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf, geoIndexer).getSearchFunction(funcURI); + + String queryText; + Object arg = filterInfo.getArguments()[0]; + if (arg instanceof Value) { + queryText = ((Value) arg).stringValue(); + } else if (arg instanceof Var) { + queryText = bindings.getBinding(((Var) arg).getName()).getValue().stringValue(); + } else { + throw new IllegalArgumentException("Query text was not resolved"); + } + + if(funcURI.equals(GeoConstants.GEO_SF_NEAR)) { + if (filterInfo.getArguments().length > 3) { + throw new IllegalArgumentException("Near functions do not support more than four arguments."); + } + + final List<String> valueList = new ArrayList<>(); + for (final Object val : filterInfo.getArguments()) { + if (val instanceof Value) { + valueList.add(((Value)val).stringValue()); + } else if (val instanceof Var) { + valueList.add(bindings.getBinding(((Var) val).getName()).getValue().stringValue()); + } else { + throw new IllegalArgumentException("Query text was not resolved"); + } + } + queryText = String.join(NEAR_DELIM, valueList); + } else if (filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + try { + final CloseableIteration<BindingSet, QueryEvaluationException> iterrez = IteratorFactory + .getIterator(filterInfo.getSpConstraint(), bindings, + queryText, searchFunction); + return iterrez; + } catch (final Exception e) { + System.out.println(e.getMessage()); + throw e; + } + } + + //returns appropriate search function for a given URI + //search functions used in GeoMesaGeoIndexer to access index + public static class GeoSearchFunctionFactory { + + Configuration conf; + + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + + private final GeoIndexer geoIndexer; + + public GeoSearchFunctionFactory(final Configuration conf, final GeoIndexer geoIndexer) { + this.conf = conf; + this.geoIndexer = geoIndexer; + } + + + /** + * Get a {@link GeoSearchFunction} for a given URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + + SearchFunction geoFunc = null; + + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (final QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + } + + private final SearchFunction GEO_EQUALS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryEquals( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_EQUALS"; + }; + }; + + private final SearchFunction GEO_DISJOINT = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryDisjoint( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_DISJOINT"; + }; + }; + + private final SearchFunction GEO_INTERSECTS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryIntersects( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_INTERSECTS"; + }; + }; + + private final SearchFunction GEO_TOUCHES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryTouches( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_TOUCHES"; + }; + }; + + private final SearchFunction GEO_CONTAINS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryContains( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CONTAINS"; + }; + }; + + private final SearchFunction GEO_OVERLAPS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryOverlaps( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_OVERLAPS"; + }; + }; + + private final SearchFunction GEO_CROSSES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryCrosses( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CROSSES"; + }; + }; + + private final SearchFunction GEO_WITHIN = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(queryText); + final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_WITHIN"; + }; + }; + + private final SearchFunction GEO_NEAR = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, + final StatementConstraints contraints) throws QueryEvaluationException { + try { + final String[] args = queryText.split(NEAR_DELIM); + Optional<Double> maxDistanceOpt = Optional.empty(); + Optional<Double> minDistanceOpt = Optional.empty(); + final String query = args[0]; + + for (int ii = 1; ii < args.length; ii++) { + String numArg = args[ii]; + + // remove pre-padding 0's since NumberUtils.isNumber() + // will assume its octal if it starts with a 0. + while (numArg.startsWith("0")) { + numArg = numArg.substring(1); + } + // was 0 + if (numArg.equals("")) { + // if max hasn't been set, set it to 0. + // Otherwise, min is just ignored. + if (!maxDistanceOpt.isPresent()) { + maxDistanceOpt = Optional.of(0.0); + } + } else { + if (!maxDistanceOpt.isPresent() && NumberUtils.isNumber(numArg)) { + // no variable identifier, going by order. + maxDistanceOpt = getDistanceOpt(numArg, "maxDistance"); + } else if (NumberUtils.isNumber(numArg)) { + // no variable identifier, going by order. + minDistanceOpt = getDistanceOpt(numArg, "minDistance"); + } else { + throw new IllegalArgumentException(numArg + " is not a valid Near function argument."); + } + } + } + final WKTReader reader = new WKTReader(); + final Geometry geometry = reader.read(query); + final NearQuery nearQuery = new NearQuery(maxDistanceOpt, minDistanceOpt, geometry); + return geoIndexer.queryNear(nearQuery, contraints); + } catch (final ParseException e) { + throw new QueryEvaluationException(e); + } + } + + private Optional<Double> getDistanceOpt(final String num, final String name) { + try { + double dist = Double.parseDouble(num); + if(dist < 0) { + throw new IllegalArgumentException("Value for: " + name + " must be non-negative."); + } + return Optional.of(Double.parseDouble(num)); + } catch (final NumberFormatException nfe) { + throw new IllegalArgumentException("Value for: " + name + " must be a number."); + } + } + + @Override + public String toString() { + return "GEO_NEAR"; + } + }; + + /** + * + */ + public class NearQuery { + private final Optional<Double> maxDistanceOpt; + private final Optional<Double> minDistanceOpt; + private final Geometry geo; + + /** + * + * @param maxDistance + * @param minDistance + * @param geo + */ + public NearQuery(final Optional<Double> maxDistance, final Optional<Double> minDistance, + final Geometry geo) { + maxDistanceOpt = maxDistance; + minDistanceOpt = minDistance; + this.geo = geo; + } + + public Optional<Double> getMaxDistance() { + return maxDistanceOpt; + } + + public Optional<Double> getMinDistance() { + return minDistanceOpt; + } + + public Geometry getGeometry() { + return geo; + } + } + + { + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_NEAR, GEO_NEAR); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/OptionalConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/OptionalConfigUtils.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/OptionalConfigUtils.java new file mode 100644 index 0000000..bfd39d0 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/accumulo/geo/OptionalConfigUtils.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.indexing.FilterFunctionOptimizer; +import org.apache.rya.indexing.GeoEnabledFilterFunctionOptimizer; +import org.apache.rya.indexing.GeoIndexerType; +import org.apache.rya.indexing.GeoTemporalIndexerType; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.geotemporal.GeoTemporalOptimizer; +import org.openrdf.model.URI; + +import com.google.common.collect.Lists; + +/** + * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. + * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. + * New code must separate parameters that are set at Rya install time from that which is specific to the client. + * Also Accumulo index tables are pushed down to the implementation and not configured in conf. + */ +public class OptionalConfigUtils extends ConfigUtils { + private static final Logger logger = Logger.getLogger(OptionalConfigUtils.class); + + + public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions"; + + public static final String USE_GEO = "sc.use_geo"; + public static final String USE_GEOTEMPORAL = "sc.use_geotemporal"; + public static final String USE_FREETEXT = "sc.use_freetext"; + public static final String USE_TEMPORAL = "sc.use_temporal"; + public static final String USE_ENTITY = "sc.use_entity"; + public static final String USE_PCJ = "sc.use_pcj"; + public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; + public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; + public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; + public static final String GEO_INDEXER_TYPE = "sc.geo.geo_indexer_type"; + + public static Set<URI> getGeoPredicates(final Configuration conf) { + return getPredicates(conf, GEO_PREDICATES_LIST); + } + + public static int getGeoNumPartitions(final Configuration conf) { + return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static boolean getUseGeo(final Configuration conf) { + return conf.getBoolean(USE_GEO, false); + } + + public static boolean getUseGeoTemporal(final Configuration conf) { + return conf.getBoolean(USE_GEOTEMPORAL, false); + } + + /** + * Retrieves the value for the geo indexer type from the config. + * @param conf the {@link Configuration}. + * @return the {@link GeoIndexerType} found in the config or + * {@code UNSPECIFIED} if it doesn't exist. + */ + public static GeoIndexerType getGeoIndexerType(final Configuration conf) { + String confType[] = conf.getStrings(GEO_INDEXER_TYPE, GeoIndexerType.UNSPECIFIED.name()); + try { + return GeoIndexerType.valueOf(GeoIndexerType.class, confType[0]); + } catch (IllegalArgumentException e) { + // if none matched, invalid configuration, fail fast. + // this is where you can allow putting any classname in the configuration. + throw new Error("Configuration contains an unknown GeoIndexerType, found: \""+GEO_INDEXER_TYPE+"\"="+confType[0]); + } + } + + public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + final List<String> indexList = Lists.newArrayList(); + final List<String> optimizers = Lists.newArrayList(); + + boolean useFilterIndex = false; + ConfigUtils.setIndexers(conf); + final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS); + if(existingIndexers != null ) { + for (final String index : existingIndexers) { + indexList.add(index); + } + for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){ + optimizers.add(optimizer); + } + } + + final GeoIndexerType geoIndexerType = getGeoIndexerType(conf); + + if (ConfigUtils.getUseMongo(conf)) { + if (getUseGeo(conf)) { + if (geoIndexerType == GeoIndexerType.UNSPECIFIED) { + // Default to MongoGeoIndexer if not specified + indexList.add(GeoIndexerType.MONGO_DB.getGeoIndexerClassString()); + } else { + indexList.add(geoIndexerType.getGeoIndexerClassString()); + } + useFilterIndex = true; + } + + if (getUseGeoTemporal(conf)) { + indexList.add(GeoTemporalIndexerType.MONGO_GEO_TEMPORAL.getGeoTemporalIndexerClassString()); + optimizers.add(GeoTemporalIndexerType.MONGO_GEO_TEMPORAL_OPTIMIZER.getGeoTemporalIndexerClassString()); + } + } else { + if (getUseGeo(conf)) { + if (geoIndexerType == GeoIndexerType.UNSPECIFIED) { + // Default to GeoMesaGeoIndexer if not specified + indexList.add(GeoIndexerType.GEO_MESA.getGeoIndexerClassString()); + } else { + indexList.add(geoIndexerType.getGeoIndexerClassString()); + } + useFilterIndex = true; + } + } + + if (useFilterIndex) { + optimizers.remove(FilterFunctionOptimizer.class.getName()); + optimizers.add(GeoEnabledFilterFunctionOptimizer.class.getName()); + } + + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); + conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); + } +}
