http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java new file mode 100644 index 0000000..c4a287e --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import org.apache.rya.indexing.external.matching.AbstractExternalSetMatcherFactory; +import org.apache.rya.indexing.external.matching.ExternalSetMatcher; +import org.apache.rya.indexing.external.matching.JoinSegment; +import org.apache.rya.indexing.external.matching.JoinSegmentMatcher; +import org.apache.rya.indexing.external.matching.OptionalJoinSegment; +import org.apache.rya.indexing.external.matching.OptionalJoinSegmentMatcher; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; + +/** + * Factory used to build {@link EntityQueryNodeMatcher}s for the {@link GeoTemporalIndexOptimizer}. + * + */ +public class GeoTemporalExternalSetMatcherFactory extends AbstractExternalSetMatcherFactory<EventQueryNode> { + + @Override + protected ExternalSetMatcher<EventQueryNode> getJoinSegmentMatcher(final JoinSegment<EventQueryNode> segment) { + return new JoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter()); + } + + @Override + protected ExternalSetMatcher<EventQueryNode> getOptionalJoinSegmentMatcher(final OptionalJoinSegment<EventQueryNode> segment) { + return new OptionalJoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java new file mode 100644 index 0000000..b2d4de5 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import org.apache.rya.indexing.entity.model.TypedEntity; + +/** + * An operation over the {@link TypedEntity} index failed to complete. + */ +public class GeoTemporalIndexException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public GeoTemporalIndexException(final String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause. <p>Note that the detail message associated with + * {@code cause} is <i>not</i> automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A <tt>null</tt> value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public GeoTemporalIndexException(final String message, final Throwable cause) { + super(message, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java new file mode 100644 index 0000000..bf12f26 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.IndexingFunctionRegistry; +import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; +import org.apache.rya.indexing.external.matching.ExternalSetProvider; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode.EventQueryNodeBuilder; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +/** + * Provides {@link GeoTupleSet}s. + */ +public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> { + private static final Logger LOG = Logger.getLogger(GeoTemporalIndexSetProvider.class); + + //organzied by object var. Each object is a filter, or set of filters + private Multimap<Var, IndexingExpr> filterMap; + + //organzied by subject var. Each subject is a GeoTemporalTupleSet + private Multimap<Var, StatementPattern> patternMap; + + //filters that have not been constrained by statement patterns into indexing expressions yet. + private Multimap<Var, FunctionCall> unmatchedFilters; + //filters that have been used, to be used by the matcher later. + private Multimap<Var, FunctionCall> matchedFilters; + + //organzied by object var. Used to find matches between unmatch filters and patterns + private Map<Var, StatementPattern> objectPatterns; + + + private static URI filterURI; + + private final EventStorage eventStorage; + + public GeoTemporalIndexSetProvider(final EventStorage eventStorage) { + this.eventStorage = requireNonNull(eventStorage); + } + + @Override + public List<EventQueryNode> getExternalSets(final QuerySegment<EventQueryNode> node) { + filterMap = HashMultimap.create(); + patternMap = HashMultimap.create(); + unmatchedFilters = HashMultimap.create(); + matchedFilters = HashMultimap.create(); + + objectPatterns = new HashMap<>(); + //discover entities + buildMaps(node); + final List<EventQueryNode> nodes = createNodes(); + + return nodes; + } + + private List<EventQueryNode> createNodes() { + final List<EventQueryNode> nodes = new ArrayList<>(); + for(final Var subj : patternMap.keySet()) { + final EventQueryNode node = getGeoTemporalNode(subj); + if(node != null) { + nodes.add(node); + } + } + return nodes; + } + + private EventQueryNode getGeoTemporalNode(final Var subj) { + final Collection<StatementPattern> patterns = patternMap.get(subj); + final Collection<FunctionCall> usedFilters = new ArrayList<>(); + Optional<StatementPattern> geoPattern = Optional.empty(); + Optional<StatementPattern> temporalPattern = Optional.empty(); + Optional<Collection<IndexingExpr>> geoFilters = Optional.empty(); + Optional<Collection<IndexingExpr>> temporalFilters = Optional.empty(); + + //should only be 2 patterns. + for(final StatementPattern sp : patterns) { + final Var obj = sp.getObjectVar(); + + ///filter map does not have -const- + + + if(filterMap.containsKey(obj)) { + final Collection<IndexingExpr> filters = filterMap.get(obj); + final IndexingFunctionRegistry.FUNCTION_TYPE type = ensureSameType(filters); + if(type != null && type == FUNCTION_TYPE.GEO) { + geoPattern = Optional.of(sp); + geoFilters = Optional.of(filters); + usedFilters.addAll(matchedFilters.get(obj)); + } else if(type != null && type == FUNCTION_TYPE.TEMPORAL) { + temporalPattern = Optional.of(sp); + temporalFilters = Optional.of(filters); + usedFilters.addAll(matchedFilters.get(obj)); + } else { + return null; + } + } else { + return null; + } + } + + if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) { + return new EventQueryNodeBuilder() + .setStorage(eventStorage) + .setGeoPattern(geoPattern.get()) + .setTemporalPattern(temporalPattern.get()) + .setGeoFilters(geoFilters.get()) + .setTemporalFilters(temporalFilters.get()) + .setUsedFilters(usedFilters) + .build(); + } else { + return null; + } + } + + private static FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) { + FUNCTION_TYPE type = null; + for(final IndexingExpr filter : filters) { + if(type == null) { + type = IndexingFunctionRegistry.getFunctionType(filter.getFunction()); + } else { + if(IndexingFunctionRegistry.getFunctionType(filter.getFunction()) != type) { + return null; + } + } + } + return type; + } + + private void buildMaps(final QuerySegment<EventQueryNode> node) { + final List<QueryModelNode> unused = new ArrayList<>(); + for (final QueryModelNode pattern : node.getOrderedNodes()) { + if(pattern instanceof FunctionCall) { + discoverFilter((FunctionCall) pattern, unused); + } + if(pattern instanceof StatementPattern) { + discoverPatterns((StatementPattern) pattern, unused); + } + } + } + + private void discoverFilter(final FunctionCall filter, final List<QueryModelNode> unmatched) { + try { + filter.visit(new FilterVisitor()); + } catch (final Exception e) { + LOG.error("Failed to match the filter object.", e); + } + } + + private void discoverPatterns(final StatementPattern pattern, final List<QueryModelNode> unmatched) { + final Var subj = pattern.getSubjectVar(); + final Var objVar = pattern.getObjectVar(); + + patternMap.put(subj, pattern); + objectPatterns.put(objVar, pattern); + //check for existing filters. + if(unmatchedFilters.containsKey(objVar)) { + final Collection<FunctionCall> calls = unmatchedFilters.removeAll(objVar); + for(final FunctionCall call : calls) { + addFilter(call); + matchedFilters.put(objVar, call); + } + } + } + + @Override + public Iterator<List<EventQueryNode>> getExternalSetCombos(final QuerySegment<EventQueryNode> segment) { + final List<List<EventQueryNode>> comboList = new ArrayList<>(); + comboList.add(getExternalSets(segment)); + return comboList.iterator(); + } + + private void addFilter(final FunctionCall call) { + filterURI = new URIImpl(call.getURI()); + final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs()); + filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call))); + } + + /** + * Finds the object/function in a Filter. If the associated statement pattern + * has been found, creates the {@link IndexingExpr} and adds it to the map. + */ + private class FilterVisitor extends QueryModelVisitorBase<Exception> { + @Override + public void meet(final FunctionCall call) throws Exception { + filterURI = new URIImpl(call.getURI()); + final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI); + if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) { + final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs()); + if(objectPatterns.containsKey(objVar)) { + filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call))); + matchedFilters.put(objVar, call); + } else { + unmatchedFilters.put(objVar, call); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java new file mode 100644 index 0000000..106588b --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.persist.index.RyaSecondaryIndexer; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; + +/** + * A repository to store, index, and retrieve {@link Statement}s based on geotemporal features. + */ +public interface GeoTemporalIndexer extends RyaSecondaryIndexer { + /** + * initialize after setting configuration. + */ + public void init(); + + /** + * Creates the {@link Eventtorage} that will be used by the indexer. + * + * @param conf - Indicates how the {@link EventStorage} is initialized. (not null) + * @return The {@link EventStorage} that will be used by this indexer. + */ + public abstract EventStorage getEventStorage(final Configuration conf); + + /** + * Used to indicate which geo filter functions to use in a query. + */ + public static enum GeoPolicy { + /** + * The provided geo object equals the geo object where the event took place. + */ + EQUALS(GeoConstants.GEO_SF_EQUALS), + + /** + * The provided geo object does not share any space with the event. + */ + DISJOINT(GeoConstants.GEO_SF_DISJOINT), + + /** + * The provided geo object shares some amount of space with the event. + */ + INTERSECTS(GeoConstants.GEO_SF_INTERSECTS), + + /** + * The provided geo object shares a point with the event, but only on the edge. + */ + TOUCHES(GeoConstants.GEO_SF_TOUCHES), + + /** + * The provided geo object shares some, but not all space with the event. + */ + CROSSES(GeoConstants.GEO_SF_CROSSES), + + /** + * The provided geo object exists completely within the event. + */ + WITHIN(GeoConstants.GEO_SF_WITHIN), + + /** + * The event took place completely within the provided geo object. + */ + CONTAINS(GeoConstants.GEO_SF_CONTAINS), + + /** + * The provided geo object has some but not all points in common with the event, + * are of the same dimension, and the intersection of the interiors has the + * same dimension as the geometries themselves. + */ + OVERLAPS(GeoConstants.GEO_SF_OVERLAPS); + + private final URI uri; + + private GeoPolicy(final URI uri) { + this.uri = uri; + } + + public URI getURI() { + return uri; + } + + public static GeoPolicy fromURI(final URI uri) { + for(final GeoPolicy policy : GeoPolicy.values()) { + if(policy.getURI().equals(uri)) { + return policy; + } + } + return null; + } + } + + static final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; + /** + * Used to indicate which temporal filter functions to use in a query. + */ + public enum TemporalPolicy { + /** + * The provided instant in time equals the instant the event took place. + */ + INSTANT_EQUALS_INSTANT(true, new URIImpl(TEMPORAL_NS+"equals")), + + /** + * The provided instant in time was before when the event took place. + */ + INSTANT_BEFORE_INSTANT(true, new URIImpl(TEMPORAL_NS+"before")), + + /** + * The provided instant in time was after when the event took place. + */ + INSTANT_AFTER_INSTANT(true, new URIImpl(TEMPORAL_NS+"after")), + + /** + * The provided instant in time was before a time period. + */ + INSTANT_BEFORE_INTERVAL(false, new URIImpl(TEMPORAL_NS+"beforeInterval")), + + /** + * The provided instant in time took place within a set of time. + */ + INSTANT_IN_INTERVAL(false, new URIImpl(TEMPORAL_NS+"insideInterval")), + + /** + * The provided instant in time took place after a time period. + */ + INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")), + + /** + * The provided instant in time equals the start of the interval in which the event took place. + */ + INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")), + + /** + * The provided instant in time equals the end of the interval in which the event took place. + */ + INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")), + + /** + * The provided interval equals the interval in which the event took place. + */ + INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")), + + /** + * The provided interval is before the interval in which the event took place. + */ + INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")), + + /** + * The provided interval is after the interval in which the event took place. + */ + INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter")); + + private final boolean isInstant; + private final URI uri; + + TemporalPolicy(final boolean isInstant, final URI uri) { + this.isInstant = isInstant; + this.uri = uri; + } + + public boolean isInstant(){ + return isInstant; + } + + public URI getURI() { + return uri; + } + + public static TemporalPolicy fromURI(final URI uri) { + for(final TemporalPolicy policy : TemporalPolicy.values()) { + if(policy.getURI().equals(uri)) { + return policy; + } + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java new file mode 100644 index 0000000..f4df8bc --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.GeoEnabledFilterFunctionOptimizer; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.GeoIndexerType; +import org.apache.rya.indexing.GeoTemporalIndexerType; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoSecondaryIndex; + +/** + * Factory for retrieving a {@link GeoTemporalIndexer} based on a provided {@link Configuration}. + */ +public class GeoTemporalIndexerFactory { + /** + * Creates and returns a {@link GeoTemporalIndexer}. + * @param conf - The {@link Configuration} to base the {@link GeoTemporalIndexer} on. + * @return The created {@link GeoTemporalIndexer}. + */ + public GeoTemporalIndexer getIndexer(final Configuration conf) { + if(ConfigUtils.getUseMongo(conf)) { + final MongoDBRdfConfiguration config = new MongoDBRdfConfiguration(conf); + for(final MongoSecondaryIndex index : config.getAdditionalIndexers()) { + if(index instanceof GeoTemporalIndexer) { + return (GeoTemporalIndexer) index; + } + } + /* Created a MongoGeoTemporalIndexer */ + final GeoTemporalIndexer index = GeoEnabledFilterFunctionOptimizer.instantiate(GeoTemporalIndexerType.MONGO_GEO_TEMPORAL.getGeoTemporalIndexerClassString(), GeoTemporalIndexer.class); + index.setConf(conf); + index.init(); + return index; + } else { + //TODO: add Accumulo here. + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java new file mode 100644 index 0000000..d626adc --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.external.matching.AbstractExternalSetOptimizer; +import org.apache.rya.indexing.external.matching.ExternalSetMatcher; +import org.apache.rya.indexing.external.matching.ExternalSetProvider; +import org.apache.rya.indexing.external.matching.QueryNodeListRater; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; + +import com.google.common.base.Optional; + + +public class GeoTemporalOptimizer extends AbstractExternalSetOptimizer<EventQueryNode> implements Configurable { + private static final GeoTemporalExternalSetMatcherFactory MATCHER_FACTORY = new GeoTemporalExternalSetMatcherFactory(); + + private GeoTemporalIndexer indexer; + private GeoTemporalIndexSetProvider provider; + private Configuration conf; + + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + final GeoTemporalIndexerFactory factory = new GeoTemporalIndexerFactory(); + indexer = factory.getIndexer(conf); + + //conf here does not matter since EventStorage has already been set in the indexer. + provider = new GeoTemporalIndexSetProvider(indexer.getEventStorage(conf)); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + protected ExternalSetMatcher<EventQueryNode> getMatcher(final QuerySegment<EventQueryNode> segment) { + return MATCHER_FACTORY.getMatcher(segment); + } + + @Override + protected ExternalSetProvider<EventQueryNode> getProvider() { + return provider; + } + + @Override + protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<EventQueryNode> segment) { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java new file mode 100644 index 0000000..22bfdb1 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.indexing.external.matching.ExternalSetConverter; +import org.apache.rya.indexing.external.matching.JoinSegment; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.ValueExpr; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link ExternalSetConverter} to convert {@link EventQueryNode}s + * to {@link QuerySegment}s. + * + */ +public class GeoTemporalToSegmentConverter implements ExternalSetConverter<EventQueryNode> { + @Override + public QuerySegment<EventQueryNode> setToSegment(final EventQueryNode set) { + Preconditions.checkNotNull(set); + final Set<QueryModelNode> matched = new HashSet<>(set.getPatterns()); + matched.addAll(set.getFilters()); + final List<QueryModelNode> unmatched = new ArrayList<>(set.getPatterns()); + return new JoinSegment<EventQueryNode>(matched, unmatched, new HashMap<ValueExpr, Filter>()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java new file mode 100644 index 0000000..4c50bfb --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.model; + +import static java.util.Objects.requireNonNull; + +import java.util.Objects; +import java.util.Optional; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer; + +import com.vividsolutions.jts.geom.Geometry; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Query object for a {@link GeoTemporalIndexer}. + * Defines a {@link Geometry}, either a {@link TemporalInstant} or + * {@link TemporalInterval}, and a triple Subject. + */ +public class Event { + private final Optional<Geometry> geometry; + private final Optional<TemporalInstant> instant; + private final Optional<TemporalInterval> interval; + private final RyaURI subject; + + private final boolean isInstant; + + /** + * Creates a new {@link Event} query object with a {@link TemporalInstant}. + * @param geo - The {@link Geometry} to use when querying. + * @param instant - The {@link TemporalInstant} to use when querying. + * @param subject - The Subject that both statements must have when querying. + */ + private Event(final Geometry geo, final TemporalInstant instant, final RyaURI subject) { + this.subject = requireNonNull(subject); + + //these fields are nullable since they are filled field by field. + this.instant = Optional.ofNullable(instant); + geometry = Optional.ofNullable(geo); + isInstant = true; + interval = Optional.empty(); + } + + /** + * Creates a new {@link Event} query object with a {@link TemporalInterval}. + * @param geo - The {@link Geometry} to use when querying. + * @param interval - The {@link TemporalInterval} to use when querying. + * @param subject - The Subject that both statements must have when querying. + */ + private Event(final Geometry geo, final TemporalInterval interval, final RyaURI subject) { + this.subject = requireNonNull(subject); + + //these fields are nullable since they are filled field by field. + this.interval = Optional.ofNullable(interval); + geometry = Optional.ofNullable(geo); + isInstant = false; + instant = Optional.empty(); + } + + /** + * @return Whether or not the query object uses a {@link TemporalInstant}. + */ + public boolean isInstant() { + return isInstant; + } + + /** + * @return The {@link Geometry} to use when querying. + */ + public Optional<Geometry> getGeometry() { + return geometry; + } + + /** + * @return The {@link TemporalInstant} to use when querying. + */ + public Optional<TemporalInstant> getInstant() { + return instant; + } + + /** + * @return The {@link TemporalInterval} to use when querying. + */ + public Optional<TemporalInterval> getInterval() { + return interval; + } + + /** + * @return The statement subject. + */ + public RyaURI getSubject() { + return subject; + } + + @Override + public int hashCode() { + if(isInstant) { + return Objects.hash(subject, geometry, instant); + } else { + return Objects.hash(subject, geometry, interval); + } + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + if(o instanceof Event) { + final Event event = (Event) o; + return Objects.equals(subject, event.subject) && + Objects.equals(isInstant, event.isInstant) && + (isInstant ? Objects.equals(instant, event.instant) : Objects.equals(interval, event.interval)); + } + return false; + } + + public static Builder builder(final Event event) { + final Builder builder = new Builder() + .setSubject(event.getSubject()); + if(event.getGeometry().isPresent()) { + builder.setGeometry(event.getGeometry().get()); + } + if(event.isInstant()) { + if(event.getInstant().isPresent()) { + builder.setTemporalInstant(event.getInstant().get()); + } + } else { + if(event.getInterval().isPresent()) { + builder.setTemporalInterval(event.getInterval().get()); + } + } + return builder; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builds instances of {@link Event}. + */ + @DefaultAnnotation(NonNull.class) + public static class Builder { + private RyaURI subject; + private Geometry geo; + private TemporalInstant instant; + private TemporalInterval interval; + + /** + * Sets the {@link RyaURI} subject. + * @param subject - The subject to key on the event. + */ + public Builder setSubject(final RyaURI subject) { + this.subject = subject; + return this; + } + + /** + * Sets the {@link Geometry}. + * @param geo - The geometry. + */ + public Builder setGeometry(final Geometry geo) { + this.geo = geo; + return this; + } + + /** + * Sets the {@link TemporalInterval}. + * @param interval - The interval. + */ + public Builder setTemporalInterval(final TemporalInterval interval) { + this.interval = interval; + return this; + } + + /** + * Sets the {@link TemporalInstant}. + * @param instant - The instant. + */ + public Builder setTemporalInstant(final TemporalInstant instant) { + this.instant = instant; + return this; + } + + /** + * @return The new {@link Event}. + */ + public Event build() { + if(instant == null) { + return new Event(geo, interval, subject); + } else { + return new Event(geo, instant, subject); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java new file mode 100644 index 0000000..104fca8 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.model; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.entity.query.EntityQueryNode; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; +import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.openrdf.model.Value; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; +import org.openrdf.query.algebra.evaluation.iterator.CollectionIteration; +import org.openrdf.query.impl.MapBindingSet; + +import com.vividsolutions.jts.geom.Geometry; + +import info.aduna.iteration.CloseableIteration; + +public class EventQueryNode extends ExternalSet implements ExternalBatchingIterator { + private final Collection<FunctionCall> usedFilters; + private final Collection<IndexingExpr> geoFilters; + private final Collection<IndexingExpr> temporalFilters; + + private final StatementPattern geoPattern; + private final StatementPattern temporalPattern; + + //Information about the subject of the patterns. + private final boolean subjectIsConstant; + private final Optional<String> subjectVar; + //not final because if the subject is a variable and the evaluate() is + // provided a binding set that contains the subject, this optional is used. + private Optional<String> subjectConstant; + + //since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured. + private final Set<String> bindingNames; + + private Collection<StatementPattern> patterns; + + private final EventStorage eventStore; + + /** + * Constructs an instance of {@link EventQueryNode}. + * @param usedFilters + * + * @param type - The type of {@link Event} this node matches. (not null) + * @param patterns - The query StatementPatterns that are solved using an + * Event of the Type. (not null) + * @param entities - The {@link EventStorage} that will be searched to match + * {@link BindingSet}s when evaluating a query. (not null) + */ + private EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException { + this.geoPattern = requireNonNull(geoPattern); + this.temporalPattern = requireNonNull(temporalPattern); + this.geoFilters = requireNonNull(geoFilters); + this.temporalFilters = requireNonNull(temporalFilters); + this.eventStore = requireNonNull(eventStore); + this.usedFilters = requireNonNull(usedFilters); + bindingNames = new HashSet<>(); + + // Subject based preconditions. + verifySameSubjects(getPatterns()); + // Predicate based preconditions. + verifyAllPredicatesAreConstants(getPatterns()); + + // The Subject may either be constant or a variable. + final Var subject = patterns.iterator().next().getSubjectVar(); + subjectIsConstant = subject.isConstant(); + if(subjectIsConstant) { + subjectConstant = Optional.of( subject.getValue().toString() ); + subjectVar = Optional.empty(); + } else { + subjectConstant = Optional.empty(); + subjectVar = Optional.of( subject.getName() ); + } + } + + @Override + public Set<String> getBindingNames() { + return bindingNames; + } + + @Override + public Set<String> getAssuredBindingNames() { + return bindingNames; + } + + /** + * Verify the Subject for all of the patterns is the same. + * + * @param patterns - The patterns to check. + * @throws IllegalStateException If all of the Subjects are not the same. + */ + private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException { + requireNonNull(patterns); + + final Iterator<StatementPattern> it = patterns.iterator(); + final Var subject = it.next().getSubjectVar(); + + while(it.hasNext()) { + final StatementPattern pattern = it.next(); + if(!pattern.getSubjectVar().equals(subject)) { + throw new IllegalStateException("At least one of the patterns has a different subject from the others. " + + "All subjects must be the same."); + } + } + } + + /** + * Verifies all of the Statement Patterns have Constants for their predicates. + * + * @param patterns - The patterns to check. (not null) + * @throws IllegalStateException A pattern has a variable predicate. + */ + private static void verifyAllPredicatesAreConstants(final Collection<StatementPattern> patterns) throws IllegalStateException { + requireNonNull(patterns); + + for(final StatementPattern pattern : patterns) { + if(!pattern.getPredicateVar().isConstant()) { + throw new IllegalStateException("The Predicate of a Statement Pattern must be constant. Pattern: " + pattern); + } + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException { + final List<BindingSet> list = new ArrayList<>(); + try { + final Collection<Event> searchEvents; + final String subj; + //if the provided binding set has the subject already, set it to the constant subject. + if(!subjectConstant.isPresent() && bindings.hasBinding(subjectVar.get())) { + subjectConstant = Optional.of(bindings.getValue(subjectVar.get()).stringValue()); + } else if(bindings.size() != 0) { + list.add(bindings); + } + + // If the subject needs to be filled in, check if the subject variable is in the binding set. + if(subjectConstant.isPresent()) { + // if it is, fetch that value and then fetch the entity for the subject. + subj = subjectConstant.get(); + searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters)); + } else { + searchEvents = eventStore.search(Optional.empty(), Optional.of(geoFilters), Optional.of(temporalFilters)); + } + + for(final Event event : searchEvents) { + final MapBindingSet resultSet = new MapBindingSet(); + if(event.getGeometry().isPresent()) { + final Geometry geo = event.getGeometry().get(); + final Value geoValue = ValueFactoryImpl.getInstance().createLiteral(geo.toText()); + final Var geoObj = geoPattern.getObjectVar(); + resultSet.addBinding(geoObj.getName(), geoValue); + } + + final Value temporalValue; + if(event.isInstant() && event.getInstant().isPresent()) { + final Optional<TemporalInstant> opt = event.getInstant(); + DateTime dt = opt.get().getAsDateTime(); + dt = dt.toDateTime(DateTimeZone.UTC); + final String str = dt.toString(TemporalInstantRfc3339.FORMATTER); + temporalValue = ValueFactoryImpl.getInstance().createLiteral(str); + } else if(event.getInterval().isPresent()) { + temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair()); + } else { + temporalValue = null; + } + + if(temporalValue != null) { + final Var temporalObj = temporalPattern.getObjectVar(); + resultSet.addBinding(temporalObj.getName(), temporalValue); + } + list.add(resultSet); + } + } catch (final ObjectStorageException e) { + throw new QueryEvaluationException("Failed to evaluate the binding set", e); + } + return new CollectionIteration<>(list); + } + + public Collection<IndexingExpr> getGeoFilters() { + return geoFilters; + } + + public Collection<IndexingExpr> getTemporalFilters() { + return temporalFilters; + } + + public Collection<FunctionCall> getFilters() { + return usedFilters; + } + + public Collection<StatementPattern> getPatterns() { + if(patterns == null) { + patterns = new ArrayList<>(); + patterns.add(geoPattern); + patterns.add(temporalPattern); + } + return patterns; + } + + @Override + public int hashCode() { + return Objects.hash(subjectIsConstant, + subjectVar, + geoFilters, + temporalFilters, + geoPattern, + temporalPattern, + bindingNames, + eventStore); + } + + @Override + public boolean equals(final Object other) { + if(other instanceof EventQueryNode) { + final EventQueryNode otherNode = (EventQueryNode)other; + return new EqualsBuilder() + .append(subjectIsConstant, otherNode.subjectIsConstant) + .append(subjectVar, otherNode.subjectVar) + .append(geoFilters, otherNode.geoFilters) + .append(geoPattern, otherNode.geoPattern) + .append(temporalFilters, otherNode.temporalFilters) + .append(temporalPattern, otherNode.temporalPattern) + .append(bindingNames, otherNode.bindingNames) + .append(subjectConstant, otherNode.subjectConstant) + .isEquals(); + } + return false; + } + + @Override + public EventQueryNode clone() { + return new EventQueryNode(eventStore, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Geo Pattern: " + geoPattern.toString()); + sb.append("\n--Geo Filters--\n"); + for(final IndexingExpr filter : geoFilters) { + sb.append(filter.toString()); + sb.append("\n"); + } + sb.append("\n-------------------\n"); + sb.append("Temporal Pattern: " + temporalPattern.toString()); + sb.append("\n--Temporal Filters--\n"); + for(final IndexingExpr filter : temporalFilters) { + sb.append(filter.toString()); + sb.append("\n"); + } + return sb.toString(); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) + throws QueryEvaluationException { + return null; + } + + /** + * Builder for {@link EventQueryNode}s. + */ + public static class EventQueryNodeBuilder { + private EventStorage store; + private StatementPattern geoPattern; + private StatementPattern temporalPattern; + private Collection<IndexingExpr> geoFilters; + private Collection<IndexingExpr> temporalFilters; + private Collection<FunctionCall> usedFilters; + + /** + * @param store - The {@link EventStorage} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setStorage(final EventStorage store) { + this.store = store; + return this; + } + + /** + * @param geoPattern - The geo {@link StatementPattern} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setGeoPattern(final StatementPattern geoPattern) { + this.geoPattern = geoPattern; + return this; + } + + /** + * @param temporalPattern - The temporal {@link StatementPattern} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setTemporalPattern(final StatementPattern temporalPattern) { + this.temporalPattern = temporalPattern; + return this; + } + + /** + * @param geoFilters - The geo filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setGeoFilters(final Collection<IndexingExpr> geoFilters) { + this.geoFilters = geoFilters; + return this; + } + + /** + * @param temporalFilters - The temporal filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setTemporalFilters(final Collection<IndexingExpr> temporalFilters) { + this.temporalFilters = temporalFilters; + return this; + } + + /** + * @param usedFilters - The filter(s) used by the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setUsedFilters(final Collection<FunctionCall> usedFilters) { + this.usedFilters = usedFilters; + return this; + } + + /** + * @return The {@link EntityQueryNode} built by the builder. + */ + public EventQueryNode build() { + return new EventQueryNode(store, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java new file mode 100644 index 0000000..47c18a0 --- /dev/null +++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.storage; + +import java.util.Collection; +import java.util.Optional; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage; + +public interface EventStorage extends RyaObjectStorage<Event> { + /** + * Search for {@link Event}s from the storage by its subject. + * Will query based on present parameters. + * + * @param subject - The subject key to find events. + * @param geoFilters - The geo filters to find Events. + * @param temporalFilters - The temporal filters to find Events. + * @return The {@link Event}, if one exists for the subject. + * @throws ObjectStorageException A problem occurred while fetching the Entity from the storage. + */ + public Collection<Event> search(final Optional<RyaURI> subject, Optional<Collection<IndexingExpr>> geoFilters, Optional<Collection<IndexingExpr>> temporalFilters) throws ObjectStorageException; + + /** + * Indicates a problem while interacting with an {@link EventStorage}. + */ + public static class EventStorageException extends ObjectStorageException { + private static final long serialVersionUID = 1L; + + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public EventStorageException(final String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause. <p>Note that the detail message associated with + * {@code cause} is <i>not</i> automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A <tt>null</tt> value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public EventStorageException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * An {@link Event} could not be created because one already exists for the Subject. + */ + public static class EventAlreadyExistsException extends EventStorageException { + private static final long serialVersionUID = 1L; + + public EventAlreadyExistsException(final String message) { + super(message); + } + + public EventAlreadyExistsException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * An {@link TypedEvent} could not be updated because the old state does not + * match the current state. + */ + public static class StaleUpdateException extends EventStorageException { + private static final long serialVersionUID = 1L; + + public StaleUpdateException(final String message) { + super(message); + } + + public StaleUpdateException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * A {@link EventFilter} is a translation from an {@link IndexingExpr} + * to a format the {@link GeoTemporalIndexer} can use to easily determine which + * filter function is being used. + * + * @param T - The type of + */ + interface EventFilter<T> { + /** + * Gets the translated query friendly form of the filter. + */ + public T getQueryObject(); + } + + /** + * Factory for getting the {@link EventFilter} from an {@link IndexingExpr}. + */ + interface EventFilterFactory<T> { + public EventFilter<T> getSearchFunction(final IndexingExpr filter); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geomesa/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geomesa/pom.xml b/extras/rya.geoindexing/geo.geomesa/pom.xml new file mode 100644 index 0000000..ebadd36 --- /dev/null +++ b/extras/rya.geoindexing/geo.geomesa/pom.xml @@ -0,0 +1,51 @@ +<?xml version='1.0'?> + +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.geoindexing</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + <artifactId>geo.geomesa</artifactId> + <name>Apache Rya Geo Indexing using GeoMesa</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <geotools.version>14.3</geotools.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>geo.common</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.locationtech.geomesa</groupId> + <artifactId>geomesa-accumulo-datastore_2.11</artifactId> + </dependency> + <dependency> + <groupId>org.geotools.xsd</groupId> + <artifactId>gt-xsd-gml3</artifactId> + <version>${geotools.version}</version> + </dependency> + <dependency> + <groupId>org.geotools</groupId> + <artifactId>gt-api</artifactId> + <version>${geotools.version}</version> + </dependency> + </dependencies> + </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java new file mode 100644 index 0000000..02ef5ba --- /dev/null +++ b/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.Md5Hash; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; +import org.geotools.data.DataStore; +import org.geotools.data.DataStoreFinder; +import org.geotools.data.DataUtilities; +import org.geotools.data.FeatureSource; +import org.geotools.data.FeatureStore; +import org.geotools.data.Query; +import org.geotools.factory.CommonFactoryFinder; +import org.geotools.factory.Hints; +import org.geotools.feature.DefaultFeatureCollection; +import org.geotools.feature.FeatureIterator; +import org.geotools.feature.SchemaException; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.locationtech.geomesa.accumulo.index.Constants; +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; +import org.opengis.filter.FilterFactory; +import org.opengis.filter.identity.Identifier; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +import info.aduna.iteration.CloseableIteration; + +/** + * A {@link GeoIndexer} wrapper around a GeoMesa {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the + * RDF Feature Type, and interacts with the Datastore. + * <p> + * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature + * contains the standard set of GeoMesa attributes (Geometry, Start Date, and End Date). The GeoMesaGeoIndexer populates the Geometry + * attribute by parsing the Well-Known Text contained in the RDF Statementâs object literal value. + * <p> + * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: + * <p> + * <table border="1"> + * <tr> + * <th>Name</th> + * <th>Symbol</th> + * <th>Type</th> + * </tr> + * <tr> + * <td>Subject Attribute</td> + * <td>S</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Predicate Attribute</td> + * <td>P</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Object Attribute</td> + * <td>O</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Context Attribute</td> + * <td>C</td> + * <td>String</td> + * </tr> + * </table> + */ +public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { + + private static final String TABLE_SUFFIX = "geo"; + + private static final Logger logger = Logger.getLogger(GeoMesaGeoIndexer.class); + + private static final String FEATURE_NAME = "RDF"; + + private static final String SUBJECT_ATTRIBUTE = "S"; + private static final String PREDICATE_ATTRIBUTE = "P"; + private static final String OBJECT_ATTRIBUTE = "O"; + private static final String CONTEXT_ATTRIBUTE = "C"; + private static final String GEOMETRY_ATTRIBUTE = Constants.SF_PROPERTY_GEOMETRY; + + private Set<URI> validPredicates; + private Configuration conf; + private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; + private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; + private SimpleFeatureType featureType; + private boolean isInit = false; + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + + private void initInternal() throws IOException { + validPredicates = ConfigUtils.getGeoPredicates(conf); + + final DataStore dataStore = createDataStore(conf); + + try { + featureType = getStatementFeatureType(dataStore); + } catch (final IOException | SchemaException e) { + throw new IOException(e); + } + + featureSource = dataStore.getFeatureSource(featureType.getName()); + if (!(featureSource instanceof FeatureStore)) { + throw new IllegalStateException("Could not retrieve feature store"); + } + featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; + } + + private static DataStore createDataStore(final Configuration conf) throws IOException { + // get the configuration parameters + final Instance instance = ConfigUtils.getInstance(conf); + final boolean useMock = instance instanceof MockInstance; + final String instanceId = instance.getInstanceName(); + final String zookeepers = instance.getZooKeepers(); + final String user = ConfigUtils.getUsername(conf); + final String password = ConfigUtils.getPassword(conf); + final String auths = ConfigUtils.getAuthorizations(conf).toString(); + final String tableName = getTableName(conf); + final int numParitions = OptionalConfigUtils.getGeoNumPartitions(conf); + + final String featureSchemaFormat = "%~#s%" + numParitions + "#r%" + FEATURE_NAME + + "#cstr%0,3#gh%yyyyMMdd#d::%~#s%3,2#gh::%~#s%#id"; + // build the map of parameters + final Map<String, Serializable> params = new HashMap<String, Serializable>(); + params.put("instanceId", instanceId); + params.put("zookeepers", zookeepers); + params.put("user", user); + params.put("password", password); + params.put("auths", auths); + params.put("tableName", tableName); + params.put("indexSchemaFormat", featureSchemaFormat); + params.put("useMock", Boolean.toString(useMock)); + + // fetch the data store from the finder + return DataStoreFinder.getDataStore(params); + } + + private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { + SimpleFeatureType featureType; + + final String[] datastoreFeatures = dataStore.getTypeNames(); + if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { + featureType = dataStore.getSchema(FEATURE_NAME); + } else { + final String featureSchema = SUBJECT_ATTRIBUTE + ":String," // + + PREDICATE_ATTRIBUTE + ":String," // + + OBJECT_ATTRIBUTE + ":String," // + + CONTEXT_ATTRIBUTE + ":String," // + + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326;geomesa.mixed.geometries='true'"; + featureType = SimpleFeatureTypes.createType(FEATURE_NAME, featureSchema); + dataStore.createSchema(featureType); + } + return featureType; + } + + @Override + public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // write this feature collection to the store + if (!featureCollection.isEmpty()) { + featureStore.addFeatures(featureCollection); + } + } + + @Override + public void storeStatement(final RyaStatement statement) throws IOException { + storeStatements(Collections.singleton(statement)); + } + + private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { + final String subject = StatementSerializer.writeSubject(statement); + final String predicate = StatementSerializer.writePredicate(statement); + final String object = StatementSerializer.writeObject(statement); + final String context = StatementSerializer.writeContext(statement); + + // create the feature + final Object[] noValues = {}; + + // create the hash + final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); + final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); + + // write the statement data to the fields + final Geometry geom = GeoParseUtils.getGeometry(statement, new GmlParser()); + if(geom == null || geom.isEmpty() || !geom.isValid()) { + throw new ParseException("Could not create geometry for statement " + statement); + } + newFeature.setDefaultGeometry(geom); + + newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject); + newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate); + newFeature.setAttribute(OBJECT_ATTRIBUTE, object); + newFeature.setAttribute(CONTEXT_ATTRIBUTE, context); + + // preserve the ID that we created for this feature + // (set the hint to FALSE to have GeoTools generate IDs) + newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE); + + return newFeature; + } + + private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, + final StatementConstraints contraints) { + final List<String> filterParms = new ArrayList<String>(); + + filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); + + if (contraints.hasSubject()) { + filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); + } + if (contraints.hasContext()) { + filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); + } + if (contraints.hasPredicates()) { + final List<String> predicates = new ArrayList<String>(); + for (final URI u : contraints.getPredicates()) { + predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); + } + filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); + } + + final String filterString = StringUtils.join(filterParms, " AND "); + logger.info("Performing geomesa query : " + filterString); + + return getIteratorWrapper(filterString); + } + + private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) { + + return new CloseableIteration<Statement, QueryEvaluationException>() { + + private FeatureIterator<SimpleFeature> featureIterator = null; + + FeatureIterator<SimpleFeature> getIterator() throws QueryEvaluationException { + if (featureIterator == null) { + Filter cqlFilter; + try { + cqlFilter = ECQL.toFilter(filterString); + } catch (final CQLException e) { + logger.error("Error parsing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + + final Query query = new Query(featureType.getTypeName(), cqlFilter); + try { + featureIterator = featureSource.getFeatures(query).features(); + } catch (final IOException e) { + logger.error("Error performing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + } + return featureIterator; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return getIterator().hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + final SimpleFeature feature = getIterator().next(); + final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); + final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); + final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); + final String contextString = feature.getAttribute(CONTEXT_ATTRIBUTE).toString(); + final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); + return statement; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + getIterator().close(); + } + }; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { + return performQuery("EQUALS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { + return performQuery("DISJOINT", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { + return performQuery("INTERSECTS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { + return performQuery("TOUCHES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { + return performQuery("CROSSES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { + return performQuery("WITHIN", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { + return performQuery("CONTAINS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { + return performQuery("OVERLAPS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, + final StatementConstraints contraints) { + throw new UnsupportedOperationException("Near queries are not supported in Accumulo."); + } + + @Override + public Set<URI> getIndexablePredicates() { + return validPredicates; + } + + @Override + public void flush() throws IOException { + // TODO cache and flush features instead of writing them one at a time + } + + @Override + public void close() throws IOException { + flush(); + } + + + @Override + public String getTableName() { + return getTableName(conf); + } + + /** + * Get the Accumulo table that will be used by this index. + * @param conf + * @return table name guaranteed to be used by instances of this index + */ + public static String getTableName(final Configuration conf) { + return makeTableName( ConfigUtils.getTablePrefix(conf) ); + } + + /** + * Make the Accumulo table name used by this indexer for a specific instance of Rya. + * + * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null) + * @return The Accumulo table name used by this indexer for a specific instance of Rya. + */ + public static String makeTableName(final String ryaInstanceName) { + requireNonNull(ryaInstanceName); + return ryaInstanceName + TABLE_SUFFIX; + } + + private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // remove this feature collection from the store + if (!featureCollection.isEmpty()) { + final Set<Identifier> featureIds = new HashSet<Identifier>(); + final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); + final Set<String> stringIds = DataUtilities.fidSet(featureCollection); + for (final String id : stringIds) { + featureIds.add(filterFactory.featureId(id)); + } + final Filter filter = filterFactory.id(featureIds); + featureStore.removeFeatures(filter); + } + } + + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + deleteStatements(Collections.singleton(statement)); + } + + @Override + public void init() { + } + + @Override + public void setConnector(final Connector connector) { + } + + @Override + public void destroy() { + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + } + + @Override + public void dropAndDestroy() { + } +}
