http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java new file mode 100644 index 0000000..db7af05 --- /dev/null +++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java @@ -0,0 +1,668 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.Md5Hash; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; +import org.geotools.data.DataStore; +import org.geotools.data.DataUtilities; +import org.geotools.data.FeatureSource; +import org.geotools.data.FeatureStore; +import org.geotools.factory.CommonFactoryFinder; +import org.geotools.factory.Hints; +import org.geotools.feature.DefaultFeatureCollection; +import org.geotools.feature.SchemaException; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; +import org.opengis.filter.FilterFactory; +import org.opengis.filter.identity.Identifier; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +import info.aduna.iteration.CloseableIteration; +import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStore; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStoreFactory; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWavePluginException; +import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery; +import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider; +import mil.nga.giat.geowave.core.store.CloseableIterator; +import mil.nga.giat.geowave.core.store.StoreFactoryFamilySpi; +import mil.nga.giat.geowave.core.store.index.PrimaryIndex; +import mil.nga.giat.geowave.core.store.memory.MemoryStoreFactoryFamily; +import mil.nga.giat.geowave.core.store.query.EverythingQuery; +import mil.nga.giat.geowave.core.store.query.QueryOptions; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloStoreFactoryFamily; + +/** + * A {@link GeoIndexer} wrapper around a GeoWave {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the + * RDF Feature Type, and interacts with the Datastore. + * <p> + * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature + * contains the standard set of GeoWave attributes (Geometry, Start Date, and End Date). The GeoWaveGeoIndexer populates the Geometry + * attribute by parsing the Well-Known Text contained in the RDF Statementâs object literal value. + * <p> + * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: + * <p> + * <table border="1"> + * <tr> + * <th>Name</th> + * <th>Symbol</th> + * <th>Type</th> + * </tr> + * <tr> + * <td>Subject Attribute</td> + * <td>S</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Predicate Attribute</td> + * <td>P</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Object Attribute</td> + * <td>O</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Context Attribute</td> + * <td>C</td> + * <td>String</td> + * </tr> + * </table> + */ +public class GeoWaveGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { + + private static final String TABLE_SUFFIX = "geo"; + + private static final Logger logger = Logger.getLogger(GeoWaveGeoIndexer.class); + + private static final String FEATURE_NAME = "RDF"; + + private static final String SUBJECT_ATTRIBUTE = "S"; + private static final String PREDICATE_ATTRIBUTE = "P"; + private static final String OBJECT_ATTRIBUTE = "O"; + private static final String CONTEXT_ATTRIBUTE = "C"; + private static final String GEO_ID_ATTRIBUTE = "geo_id"; + private static final String GEOMETRY_ATTRIBUTE = "geowave_index_geometry"; + + private Set<URI> validPredicates; + private Configuration conf; + private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; + private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; + private SimpleFeatureType featureType; + private FeatureDataAdapter featureDataAdapter; + private DataStore geoToolsDataStore; + private mil.nga.giat.geowave.core.store.DataStore geoWaveDataStore; + private final PrimaryIndex index = new SpatialDimensionalityTypeProvider().createPrimaryIndex(); + private boolean isInit = false; + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * @return the internal GeoTools{@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public DataStore getGeoToolsDataStore() { + return geoToolsDataStore; + } + + /** + * @return the internal GeoWave {@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public mil.nga.giat.geowave.core.store.DataStore getGeoWaveDataStore() { + return geoWaveDataStore; + } + + private void initInternal() throws IOException { + validPredicates = ConfigUtils.getGeoPredicates(conf); + + try { + geoToolsDataStore = createDataStore(conf); + geoWaveDataStore = ((GeoWaveGTDataStore) geoToolsDataStore).getDataStore(); + } catch (final GeoWavePluginException e) { + logger.error("Failed to create GeoWave data store", e); + } + + try { + featureType = getStatementFeatureType(geoToolsDataStore); + } catch (final IOException | SchemaException e) { + throw new IOException(e); + } + + featureDataAdapter = new FeatureDataAdapter(featureType); + + featureSource = geoToolsDataStore.getFeatureSource(featureType.getName()); + if (!(featureSource instanceof FeatureStore)) { + throw new IllegalStateException("Could not retrieve feature store"); + } + featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; + } + + public Map<String, Serializable> getParams(final Configuration conf) { + // get the configuration parameters + final Instance instance = ConfigUtils.getInstance(conf); + final String instanceId = instance.getInstanceName(); + final String zookeepers = instance.getZooKeepers(); + final String user = ConfigUtils.getUsername(conf); + final String password = ConfigUtils.getPassword(conf); + final String auths = ConfigUtils.getAuthorizations(conf).toString(); + final String tableName = getTableName(conf); + final String tablePrefix = ConfigUtils.getTablePrefix(conf); + + final Map<String, Serializable> params = new HashMap<>(); + params.put("zookeeper", zookeepers); + params.put("instance", instanceId); + params.put("user", user); + params.put("password", password); + params.put("namespace", tableName); + params.put("gwNamespace", tablePrefix + getClass().getSimpleName()); + + params.put("Lock Management", LockManagementType.MEMORY.toString()); + params.put("Authorization Management Provider", AuthorizationManagementProviderType.EMPTY.toString()); + params.put("Authorization Data URL", null); + params.put("Transaction Buffer Size", 10000); + params.put("Query Index Strategy", QueryIndexStrategyType.HEURISTIC_MATCH.toString()); + return params; + } + + /** + * Creates the {@link DataStore} for the {@link GeoWaveGeoIndexer}. + * @param conf the {@link Configuration}. + * @return the {@link DataStore}. + */ + public DataStore createDataStore(final Configuration conf) throws IOException, GeoWavePluginException { + final Map<String, Serializable> params = getParams(conf); + final Instance instance = ConfigUtils.getInstance(conf); + final boolean useMock = instance instanceof MockInstance; + + final StoreFactoryFamilySpi storeFactoryFamily; + if (useMock) { + storeFactoryFamily = new MemoryStoreFactoryFamily(); + } else { + storeFactoryFamily = new AccumuloStoreFactoryFamily(); + } + + final GeoWaveGTDataStoreFactory geoWaveGTDataStoreFactory = new GeoWaveGTDataStoreFactory(storeFactoryFamily); + final DataStore dataStore = geoWaveGTDataStoreFactory.createNewDataStore(params); + + return dataStore; + } + + private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { + SimpleFeatureType featureType; + + final String[] datastoreFeatures = dataStore.getTypeNames(); + if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { + featureType = dataStore.getSchema(FEATURE_NAME); + } else { + featureType = DataUtilities.createType(FEATURE_NAME, + SUBJECT_ATTRIBUTE + ":String," + + PREDICATE_ATTRIBUTE + ":String," + + OBJECT_ATTRIBUTE + ":String," + + CONTEXT_ATTRIBUTE + ":String," + + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326," + + GEO_ID_ATTRIBUTE + ":String"); + + dataStore.createSchema(featureType); + } + return featureType; + } + + @Override + public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // write this feature collection to the store + if (!featureCollection.isEmpty()) { + featureStore.addFeatures(featureCollection); + } + } + + @Override + public void storeStatement(final RyaStatement statement) throws IOException { + storeStatements(Collections.singleton(statement)); + } + + private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { + final String subject = StatementSerializer.writeSubject(statement); + final String predicate = StatementSerializer.writePredicate(statement); + final String object = StatementSerializer.writeObject(statement); + final String context = StatementSerializer.writeContext(statement); + + // create the feature + final Object[] noValues = {}; + + // create the hash + final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); + final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); + + // write the statement data to the fields + final Geometry geom = GeoParseUtils.getGeometry(statement, new GmlParser()); + if(geom == null || geom.isEmpty() || !geom.isValid()) { + throw new ParseException("Could not create geometry for statement " + statement); + } + newFeature.setDefaultGeometry(geom); + + newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject); + newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate); + newFeature.setAttribute(OBJECT_ATTRIBUTE, object); + newFeature.setAttribute(CONTEXT_ATTRIBUTE, context); + // GeoWave does not support querying based on a user generated feature ID + // So, we create a separate ID attribute that it can query on. + newFeature.setAttribute(GEO_ID_ATTRIBUTE, statementId); + + // preserve the ID that we created for this feature + // (set the hint to FALSE to have GeoTools generate IDs) + newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE); + + return newFeature; + } + + private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, + final StatementConstraints contraints) { + final List<String> filterParms = new ArrayList<String>(); + + filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); + + if (contraints.hasSubject()) { + filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); + } + if (contraints.hasContext()) { + filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); + } + if (contraints.hasPredicates()) { + final List<String> predicates = new ArrayList<String>(); + for (final URI u : contraints.getPredicates()) { + predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); + } + filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); + } + + final String filterString = StringUtils.join(filterParms, " AND "); + logger.info("Performing geowave query : " + filterString); + + return getIteratorWrapper(filterString); + } + + private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) { + + return new CloseableIteration<Statement, QueryEvaluationException>() { + + private CloseableIterator<SimpleFeature> featureIterator = null; + + CloseableIterator<SimpleFeature> getIterator() throws QueryEvaluationException { + if (featureIterator == null) { + Filter cqlFilter; + try { + cqlFilter = ECQL.toFilter(filterString); + } catch (final CQLException e) { + logger.error("Error parsing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + + final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, featureDataAdapter); + final QueryOptions queryOptions = new QueryOptions(featureDataAdapter, index); + + try { + featureIterator = geoWaveDataStore.query(queryOptions, cqlQuery); + } catch (final Exception e) { + logger.error("Error performing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + } + return featureIterator; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return getIterator().hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + final SimpleFeature feature = getIterator().next(); + final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); + final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); + final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); + final Object context = feature.getAttribute(CONTEXT_ATTRIBUTE); + final String contextString = context != null ? context.toString() : ""; + final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); + return statement; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + try { + getIterator().close(); + } catch (final IOException e) { + throw new QueryEvaluationException(e); + } + } + }; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { + return performQuery("EQUALS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { + return performQuery("DISJOINT", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { + return performQuery("INTERSECTS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { + return performQuery("TOUCHES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { + return performQuery("CROSSES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { + return performQuery("WITHIN", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { + return performQuery("CONTAINS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { + return performQuery("OVERLAPS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, + final StatementConstraints contraints) { + throw new UnsupportedOperationException("Near queries are not supported in Accumulo."); + } + + @Override + public Set<URI> getIndexablePredicates() { + return validPredicates; + } + + @Override + public void flush() throws IOException { + // TODO cache and flush features instead of writing them one at a time + } + + @Override + public void close() throws IOException { + flush(); + } + + + @Override + public String getTableName() { + return getTableName(conf); + } + + /** + * Get the Accumulo table that will be used by this index. + * @param conf + * @return table name guaranteed to be used by instances of this index + */ + public static String getTableName(final Configuration conf) { + return makeTableName( ConfigUtils.getTablePrefix(conf) ); + } + + /** + * Make the Accumulo table name used by this indexer for a specific instance of Rya. + * + * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null) + * @return The Accumulo table name used by this indexer for a specific instance of Rya. + */ + public static String makeTableName(final String ryaInstanceName) { + requireNonNull(ryaInstanceName); + return ryaInstanceName + TABLE_SUFFIX; + } + + private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // remove this feature collection from the store + if (!featureCollection.isEmpty()) { + final Set<Identifier> featureIds = new HashSet<Identifier>(); + final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); + final Set<String> stringIds = DataUtilities.fidSet(featureCollection); + for (final String id : stringIds) { + featureIds.add(filterFactory.featureId(id)); + } + + final String filterString = stringIds.stream().collect(Collectors.joining("','", "'", "'")); + + Filter filter = null; + try { + filter = ECQL.toFilter(GEO_ID_ATTRIBUTE + " IN (" + filterString + ")", filterFactory); + } catch (final CQLException e) { + logger.error("Unable to generate filter for deleting the statement.", e); + } + + featureStore.removeFeatures(filter); + } + } + + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + deleteStatements(Collections.singleton(statement)); + } + + @Override + public void init() { + } + + @Override + public void setConnector(final Connector connector) { + } + + @Override + public void destroy() { + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + // delete existing data + geoWaveDataStore.delete(new QueryOptions(), new EverythingQuery()); + } + + @Override + public void dropAndDestroy() { + } + + /** + * The list of supported Geo Wave {@code LockingManagementFactory} types. + */ + private static enum LockManagementType { + MEMORY("memory"); + + private final String name; + + /** + * Creates a new {@link LockManagementType}. + * @param name the name of the type. (not {@code null}) + */ + private LockManagementType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } + + /** + * The list of supported Geo Wave {@code AuthorizationFactorySPI } types. + */ + private static enum AuthorizationManagementProviderType { + EMPTY("empty"), + JSON_FILE("jsonFile"); + + private final String name; + + /** + * Creates a new {@link AuthorizationManagementProviderType}. + * @param name the name of the type. (not {@code null}) + */ + private AuthorizationManagementProviderType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } + + /** + * The list of supported Geo Wave {@code IndexQueryStrategySPI} types. + */ + private static enum QueryIndexStrategyType { + BEST_MATCH("Best Match"), + HEURISTIC_MATCH("Heuristic Match"), + PRESERVE_LOCALITY("Preserve Locality"); + + private final String name; + + /** + * Creates a new {@link QueryIndexStrategyType}. + * @param name the name of the type. (not {@code null}) + */ + private QueryIndexStrategyType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java new file mode 100644 index 0000000..af72b3a --- /dev/null +++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import java.io.IOException; +import java.io.Reader; + +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser; +import org.geotools.gml3.GMLConfiguration; +import org.xml.sax.SAXException; + +import com.vividsolutions.jts.geom.Geometry; + + +/** + * This wraps geotools parser for rya.geoCommon that cannot be dependent on geotools. + * + */ +public class GmlParser implements GmlToGeometryParser { + + /* (non-Javadoc) + * @see org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser#parse(java.io.Reader) + */ + @Override + public Geometry parse(Reader reader) throws IOException, SAXException, ParserConfigurationException { + final org.geotools.xml.Parser gmlParser = new org.geotools.xml.Parser(new GMLConfiguration()); + return (Geometry) gmlParser.parse(reader); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java new file mode 100644 index 0000000..33a4bec --- /dev/null +++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java @@ -0,0 +1,445 @@ +package org.apache.rya.indexing.geoExamples; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import org.apache.commons.lang.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.GeoIndexerType; +import org.apache.rya.indexing.GeoRyaSailFactory; +import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.QueryResultHandlerException; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResultHandler; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.query.Update; +import org.openrdf.query.UpdateExecutionException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +public class GeowaveDirectExample { + private static final Logger log = Logger.getLogger(GeowaveDirectExample.class); + + // + // Connection configuration parameters + // + + private static final boolean USE_MOCK_INSTANCE = true; + private static final boolean PRINT_QUERIES = true; + private static final String INSTANCE = "instance"; + private static final String RYA_TABLE_PREFIX = "x_test_triplestore_"; + private static final String AUTHS = "U"; + + public static void main(final String[] args) throws Exception { + final Configuration conf = getConf(); + conf.set(PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.name()); + conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, PRINT_QUERIES); + conf.setBoolean(OptionalConfigUtils.USE_GEO, true); + conf.setEnum(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE); + + log.info("Creating the tables as root."); + + SailRepository repository = null; + SailRepositoryConnection conn = null; + + try { + log.info("Connecting to Geo Sail Repository."); + final Sail extSail = GeoRyaSailFactory.getInstance(conf); + repository = new SailRepository(extSail); + conn = repository.getConnection(); + + final long start = System.currentTimeMillis(); + log.info("Running SPARQL Example: Add Point and Geo Search with PCJ"); + testAddPointAndWithinSearchWithPCJ(conn); + log.info("Running SPARQL Example: Temporal, Freetext, and Geo Search"); + testTemporalFreeGeoSearch(conn); + log.info("Running SPARQL Example: Geo, Freetext, and PCJ Search"); + testGeoFreetextWithPCJSearch(conn); + log.info("Running SPARQL Example: Delete Geo Data"); + testDeleteGeoData(conn); + + log.info("TIME: " + (System.currentTimeMillis() - start) / 1000.); + } finally { + log.info("Shutting down"); + closeQuietly(conn); + closeQuietly(repository); + } + } + + private static void closeQuietly(final SailRepository repository) { + if (repository != null) { + try { + repository.shutDown(); + } catch (final RepositoryException e) { + // quietly absorb this exception + } + } + } + + private static void closeQuietly(final SailRepositoryConnection conn) { + if (conn != null) { + try { + conn.close(); + } catch (final RepositoryException e) { + // quietly absorb this exception + } + } + } + + private static Configuration getConf() { + + + return AccumuloIndexingConfiguration.builder() + .setUseMockAccumulo(USE_MOCK_INSTANCE) + .setAuths(AUTHS) + .setAccumuloUser("root") + .setAccumuloPassword("") + .setAccumuloInstance(INSTANCE) + .setRyaPrefix(RYA_TABLE_PREFIX) + .setUsePcj(true) + .setUseAccumuloFreetextIndex(true) + .setUseAccumuloTemporalIndex(true) + .build(); + + } + + + + private static void testAddPointAndWithinSearchWithPCJ( + final SailRepositoryConnection conn) throws Exception { + + final String update = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "INSERT DATA { " // + + " <urn:feature> a geo:Feature ; " // + + " geo:hasGeometry [ " // + + " a geo:Point ; " // + + " geo:asWKT \"Point(-77.03524 38.889468)\"^^geo:wktLiteral "// + + " ] . " // + + "}"; + + final Update u = conn.prepareUpdate(QueryLanguage.SPARQL, update); + u.execute(); + + String queryString; + TupleQuery tupleQuery; + CountingResultHandler tupleHandler; + + // point outside search ring + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt " // + + "{" // + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " // + + "}";// + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("point outside search ring, Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 0); + // point inside search ring + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt " // ?e ?l ?o" // + + "{" // +// + " ?feature a ?e . "// +// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// +// + " ?e <uri:talksTo> ?o . "// + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " // + + "}";// + + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("point inside search ring, Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 1); + + // point inside search ring with Pre-Computed Join + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt "//?e ?l ?o" // + + "{" // +// + " ?feature a ?e . "// +// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// +// + " ?e <uri:talksTo> ?o . "// + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " // + + "}";// + + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("point inside search ring with Pre-Computed Join, Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() >= 1); // may see points from + // during previous runs + + // point outside search ring with PCJ + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt "//?e ?l ?o " // + + "{" // +// + " ?feature a ?e . "// +// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// +// + " ?e <uri:talksTo> ?o . "// + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " // + + "}";// + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("point outside search ring with PCJ, Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 0); + + // point inside search ring with different Pre-Computed Join + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point "//?wkt ?e ?c ?l ?o " // + + "{" // +// + " ?e a ?c . "// +// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// + //+ " ?e <uri:talksTo> ?o . "// + //+ " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " // + + "}";// + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("point inside search ring with different Pre-Computed Join, Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 1); + } + + private static void testTemporalFreeGeoSearch( + final SailRepositoryConnection conn) + throws MalformedQueryException, RepositoryException, + UpdateExecutionException, TupleQueryResultHandlerException, + QueryEvaluationException { + // Once upon a time, a meeting happened, in a place and time, attended by 5 paladins and another. + final String update = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "PREFIX time: <http://www.w3.org/2006/time#> "// + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> "// + + "PREFIX fts: <http://rdf.useekm.com/fts#> "// + + "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> "// + + "PREFIX ex: <http://example.com/#> "// + + "INSERT DATA { " // + + " ex:feature719 a geo:Feature ; " // + + " geo:hasGeometry [ " // + + " a geo:Point ; " // + + " geo:asWKT \"Point(-77.03524 38.889468)\"^^geo:wktLiteral "// + + " ] . "// + + " ex:event719 a time:Instant ;" // + + " time:inXSDDateTime '2001-01-01T01:01:04-08:00' ;" // 4 seconds + + " ex:locatedAt ex:feature719 ;" // + + " ex:attendee ex:person01;" // + + " ex:attendee ex:person02;" // + + " ex:attendee ex:person03;" // + + " ex:attendee [a ex:Person ; rdfs:label 'Paladin Ogier the Dane' ] ;" // Use a blank node instead of person04 + + " ex:attendee ex:person05;" // + + " ex:attendee ex:person06." // + + " ex:person01 a ex:Person ;" // + + " rdfs:label \"Paladin Fossil\"." // + + " ex:person02 a ex:Person ;" // + + " rdfs:label \"Paladin Paul Denning\"." // + + " ex:person03 a ex:Person ;" // + + " rdfs:label 'Paladin Will Travel'." // + + " ex:person05 a ex:Person ;" // + + " rdfs:label 'Paladin dimethyl disulfide'." // + + " ex:person06 a ex:Person ;" // + + " rdfs:label 'Ignore me'." // + + "" // + + "}"; + final Update u = conn.prepareUpdate(QueryLanguage.SPARQL, update); + u.execute(); + + String queryString; + TupleQuery tupleQuery; + CountingResultHandler tupleHandler; + + // Find all events after a time, located in a polygon square, whose attendees have label names beginning with "Pal" + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "PREFIX time: <http://www.w3.org/2006/time#> "// + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> "// + + "PREFIX fts: <http://rdf.useekm.com/fts#> "// + + "PREFIX ex: <http://example.com/#> "// + + "SELECT ?feature ?point ?wkt ?event ?time ?person ?match" // + + "{" // + + " ?event a time:Instant ; \n"// + + " time:inXSDDateTime ?time ; \n"// + + " ex:locatedAt ?feature ;" // + + " ex:attendee ?person." // + + " FILTER(tempo:after(?time, '2001-01-01T01:01:03-08:00') ) \n"// after 3 seconds + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)). " // + + " ?person a ex:Person . "// + + " ?person <http://www.w3.org/2000/01/rdf-schema#label> ?match . "// + + " FILTER(fts:text(?match, \"Pal*\")) " // + + "}";// + + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 5 ); + + } + + private static void testGeoFreetextWithPCJSearch( + final SailRepositoryConnection conn) + throws MalformedQueryException, RepositoryException, + TupleQueryResultHandlerException, QueryEvaluationException { + // ring outside point + final String queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX fts: <http://rdf.useekm.com/fts#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt ?e ?c ?l ?o ?person ?match " // + + "{" // + + " ?person a <http://example.org/ontology/Person> . "// + + " ?person <http://www.w3.org/2000/01/rdf-schema#label> ?match . "// + + " FILTER(fts:text(?match, \"!alice & hose\")) " // + + " ?e a ?c . "// + + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// + + " ?e <uri:talksTo> ?o . "// + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " // + + "}";// + final TupleQuery tupleQuery = conn.prepareTupleQuery( + QueryLanguage.SPARQL, queryString); + final CountingResultHandler tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 0);// TODO ==1 some data is missing for this query! + } + + + + private static void testDeleteGeoData(final SailRepositoryConnection conn) + throws Exception { + // Delete all stored points + final String sparqlDelete = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "DELETE {\n" // + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + "}\n" + "WHERE { \n" + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + "}";// + + final Update deleteUpdate = conn.prepareUpdate(QueryLanguage.SPARQL, + sparqlDelete); + deleteUpdate.execute(); + + String queryString; + TupleQuery tupleQuery; + CountingResultHandler tupleHandler; + + // Find all stored points + queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "// + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "// + + "SELECT ?feature ?point ?wkt " // + + "{" // + + " ?feature a geo:Feature . "// + + " ?feature geo:hasGeometry ?point . "// + + " ?point a geo:Point . "// + + " ?point geo:asWKT ?wkt . "// + + "}";// + tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString); + tupleHandler = new CountingResultHandler(); + tupleQuery.evaluate(tupleHandler); + log.info("Result count : " + tupleHandler.getCount()); + Validate.isTrue(tupleHandler.getCount() == 0); + } + + private static class CountingResultHandler implements + TupleQueryResultHandler { + private int count = 0; + + public int getCount() { + return count; + } + + public void resetCount() { + count = 0; + } + + @Override + public void startQueryResult(final List<String> arg0) + throws TupleQueryResultHandlerException { + } + + @Override + public void handleSolution(final BindingSet arg0) + throws TupleQueryResultHandlerException { + count++; + System.out.println(arg0); + } + + @Override + public void endQueryResult() throws TupleQueryResultHandlerException { + } + + @Override + public void handleBoolean(final boolean arg0) + throws QueryResultHandlerException { + } + + @Override + public void handleLinks(final List<String> arg0) + throws QueryResultHandlerException { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java new file mode 100644 index 0000000..f36a515 --- /dev/null +++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.indexing.GeoIndexerType; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.geotools.data.DataStore; +import org.geotools.data.DataUtilities; +import org.geotools.data.DefaultTransaction; +import org.geotools.data.DelegatingFeatureReader; +import org.geotools.data.FeatureReader; +import org.geotools.data.FeatureWriter; +import org.geotools.data.Query; +import org.geotools.data.Transaction; +import org.geotools.feature.SchemaException; +import org.geotools.feature.visitor.MaxVisitor; +import org.geotools.feature.visitor.MinVisitor; +import org.geotools.filter.FilterFactoryImpl; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; + +import com.google.common.collect.Sets; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.PrecisionModel; + +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveFeatureReader; +import mil.nga.giat.geowave.adapter.vector.utils.DateUtilities; + +/** + * Tests the {@link FeatureReader} capabilities within the + * {@link GeoWaveGeoIndexer). + */ +public class GeoWaveFeatureReaderTest { + private DataStore dataStore; + private SimpleFeatureType type; + private final GeometryFactory factory = new GeometryFactory(new PrecisionModel(PrecisionModel.FIXED)); + private Query query = null; + private final List<String> fids = new ArrayList<>(); + private final List<String> pids = new ArrayList<>(); + private Date stime, etime; + + private AccumuloRdfConfiguration conf; + + private void setupConf() throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("triplestore_"); + final String tableName = GeoWaveGeoIndexer.getTableName(conf); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.set(ConfigUtils.CLOUDBASE_USER, "USERNAME"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "PASS"); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "INSTANCE"); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, "localhost"); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U"); + conf.set(OptionalConfigUtils.USE_GEO, "true"); + conf.set(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE.toString()); + + final TableOperations tops = ConfigUtils.getConnector(conf).tableOperations(); + // get all of the table names with the prefix + final Set<String> toDel = Sets.newHashSet(); + for (final String t : tops.list()) { + if (t.startsWith(tableName)) { + toDel.add(t); + } + } + for (final String t : toDel) { + tops.delete(t); + } + } + + @Before + public void setup() throws SchemaException, CQLException, Exception { + setupConf(); + try (final GeoWaveGeoIndexer indexer = new GeoWaveGeoIndexer()) { + indexer.setConf(conf); + dataStore = indexer.getGeoToolsDataStore(); + // Clear old data + indexer.purge(conf); + + type = DataUtilities.createType( + "GeoWaveFeatureReaderTest", + "geometry:Geometry:srid=4326,start:Date,end:Date,pop:java.lang.Long,pid:String"); + + dataStore.createSchema(type); + + stime = DateUtilities.parseISO("2005-05-15T20:32:56Z"); + etime = DateUtilities.parseISO("2005-05-20T20:32:56Z"); + + final Transaction transaction1 = new DefaultTransaction(); + final FeatureWriter<SimpleFeatureType, SimpleFeature> writer = dataStore.getFeatureWriter( + type.getTypeName(), + transaction1); + assertFalse(writer.hasNext()); + SimpleFeature newFeature = writer.next(); + newFeature.setAttribute( + "pop", + Long.valueOf(100)); + newFeature.setAttribute( + "pid", + "a" + UUID.randomUUID().toString()); + newFeature.setAttribute( + "start", + stime); + newFeature.setAttribute( + "end", + etime); + newFeature.setAttribute( + "geometry", + factory.createPoint(new Coordinate(27.25, 41.25))); + fids.add(newFeature.getID()); + pids.add(newFeature.getAttribute("pid").toString()); + writer.write(); + newFeature = writer.next(); + newFeature.setAttribute( + "pop", + Long.valueOf(101)); + newFeature.setAttribute( + "pid", + "b" + UUID.randomUUID().toString()); + newFeature.setAttribute( + "start", + etime); + newFeature.setAttribute( + "geometry", + factory.createPoint(new Coordinate(28.25, 41.25))); + fids.add(newFeature.getID()); + pids.add(newFeature.getAttribute("pid").toString()); + writer.write(); + writer.close(); + transaction1.commit(); + transaction1.close(); + + query = new Query( + "GeoWaveFeatureReaderTest", + ECQL.toFilter("IN ('" + fids.get(0) + "')"), + new String[] { + "geometry", + "pid" + }); + } + } + + @After + public void tearDown() { + dataStore.dispose(); + } + + @Test + public void testFID() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException { + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertTrue(count > 0); + } + + @Test + public void testFidFilterQuery() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException { + final String fidsString = fids.stream().collect(Collectors.joining("','", "'", "'")); + final Filter filter = ECQL.toFilter("IN (" + fidsString + ")"); + final Query query = new Query( + "GeoWaveFeatureReaderTest", + filter, + new String[] { + "geometry", + "pid" + }); + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertTrue(count == fids.size()); + } + + @Test + public void testPidFilterQuery() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException { + // Filter it so that it only queries for everything but the first pid. + // There's only 2 pids total so it should just return the second one. + final String pidsString = pids.subList(1, pids.size()).stream().collect(Collectors.joining("','", "'", "'")); + final Filter filter = ECQL.toFilter("pid IN (" + pidsString + ")"); + final Query query = new Query( + "GeoWaveFeatureReaderTest", + filter, + new String[] { + "geometry", + "pid" + }); + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertTrue(count == pids.size() - 1); + } + + + @Test + public void testBBOX() throws IllegalArgumentException, NoSuchElementException, IOException { + final FilterFactoryImpl factory = new FilterFactoryImpl(); + final Query query = new Query( + "GeoWaveFeatureReaderTest", + factory.bbox( + "", + -180, + -90, + 180, + 90, + "EPSG:4326"), + new String[] { + "geometry", + "pid" + }); + + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertTrue(count > 0); + } + + @Test + public void testRangeIndex() throws IllegalArgumentException, NoSuchElementException, IOException { + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertEquals(1, count); + } + + @Test + public void testLike() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException { + final Query query = new Query( + "GeoWaveFeatureReaderTest", + ECQL.toFilter("pid like '" + pids.get( + 0).substring( + 0, + 1) + "%'"), + new String[] { + "geometry", + "pid" + }); + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertEquals(1, count); + } + + @Test + public void testRemoveFeature() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException { + final Query query = new Query( + "GeoWaveFeatureReaderTest", + ECQL.toFilter("pid like '" + pids.get( + 0).substring( + 0, + 1) + "%'"), + new String[] { + "geometry", + "pid" + }); + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int count = 0; + while (reader.hasNext()) { + final SimpleFeature feature = reader.next(); + assertTrue(fids.contains(feature.getID())); + count++; + } + assertEquals(1, count); + + // Remove + final FeatureWriter<SimpleFeatureType, SimpleFeature> writer = + dataStore.getFeatureWriter(type.getTypeName(), Transaction.AUTO_COMMIT); + try { + while (writer.hasNext()) { + writer.next(); + writer.remove(); + } + } finally { + writer.close(); + } + + // Re-query + final FeatureReader<SimpleFeatureType, SimpleFeature> reader2 = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + int recount = 0; + while (reader2.hasNext()) { + reader2.next(); + recount++; + } + assertEquals(0, recount); + } + + @Test + public void testMax() throws IllegalArgumentException, NoSuchElementException, IOException { + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + final MaxVisitor visitor = new MaxVisitor("start", type); + unwrapDelegatingFeatureReader(reader).getFeatureCollection().accepts(visitor, null); + assertTrue(visitor.getMax().equals(etime)); + } + + @Test + public void testMin() throws IllegalArgumentException, NoSuchElementException, IOException { + final FeatureReader<SimpleFeatureType, SimpleFeature> reader = + dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT); + final MinVisitor visitor = new MinVisitor("start", type); + unwrapDelegatingFeatureReader(reader).getFeatureCollection().accepts(visitor, null); + assertTrue(visitor.getMin().equals(stime)); + } + + private GeoWaveFeatureReader unwrapDelegatingFeatureReader(final FeatureReader<SimpleFeatureType, SimpleFeature> reader ) { + // GeoTools uses decorator pattern to wrap FeatureReaders + // we need to get down to the inner GeoWaveFeatureReader + FeatureReader<SimpleFeatureType, SimpleFeature> currReader = reader; + while (!(currReader instanceof GeoWaveFeatureReader)) { + currReader = ((DelegatingFeatureReader<SimpleFeatureType, SimpleFeature>) currReader).getDelegate(); + } + return (GeoWaveFeatureReader) currReader; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java new file mode 100644 index 0000000..778b5ef --- /dev/null +++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.commons.io.FileUtils; +import org.geotools.feature.AttributeTypeBuilder; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.feature.simple.SimpleFeatureTypeBuilder; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Envelope; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.Polygon; + +import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; +import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery; +import mil.nga.giat.geowave.core.geotime.GeometryUtils; +import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider; +import mil.nga.giat.geowave.core.geotime.store.query.SpatialQuery; +import mil.nga.giat.geowave.core.store.CloseableIterator; +import mil.nga.giat.geowave.core.store.DataStore; +import mil.nga.giat.geowave.core.store.IndexWriter; +import mil.nga.giat.geowave.core.store.index.PrimaryIndex; +import mil.nga.giat.geowave.core.store.query.QueryOptions; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore; +import mil.nga.giat.geowave.datastore.accumulo.BasicAccumuloOperations; +import mil.nga.giat.geowave.datastore.accumulo.minicluster.MiniAccumuloClusterFactory; + +/** + * This class is intended to provide a self-contained, easy-to-follow example of + * a few GeoTools queries against GeoWave. For simplicity, a MiniAccumuloCluster + * is spun up and a few points from the DC area are ingested (Washington + * Monument, White House, FedEx Field). Two queries are executed against this + * data set. + */ +public class GeoWaveGTQueryTest { + private static File tempAccumuloDir; + private static MiniAccumuloClusterImpl accumulo; + private static DataStore dataStore; + + private static final PrimaryIndex INDEX = new SpatialDimensionalityTypeProvider().createPrimaryIndex(); + + // Points (to be ingested into GeoWave Data Store) + private static final Coordinate WASHINGTON_MONUMENT = new Coordinate(-77.0352, 38.8895); + private static final Coordinate WHITE_HOUSE = new Coordinate(-77.0366, 38.8977); + private static final Coordinate FEDEX_FIELD = new Coordinate(-76.8644, 38.9078); + + // cities used to construct Geometries for queries + private static final Coordinate BALTIMORE = new Coordinate(-76.6167, 39.2833); + private static final Coordinate RICHMOND = new Coordinate(-77.4667, 37.5333); + private static final Coordinate HARRISONBURG = new Coordinate(-78.8689, 38.4496); + + private static final Map<String, Coordinate> CANNED_DATA = ImmutableMap.of( + "Washington Monument", WASHINGTON_MONUMENT, + "White House", WHITE_HOUSE, + "FedEx Field", FEDEX_FIELD + ); + + private static final FeatureDataAdapter ADAPTER = new FeatureDataAdapter(getPointSimpleFeatureType()); + + private static final String ACCUMULO_USER = "root"; + private static final String ACCUMULO_PASSWORD = "password"; + private static final String TABLE_NAMESPACE = ""; + + @BeforeClass + public static void setup() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException { + tempAccumuloDir = Files.createTempDir(); + + accumulo = MiniAccumuloClusterFactory.newAccumuloCluster( + new MiniAccumuloConfigImpl(tempAccumuloDir, ACCUMULO_PASSWORD), + GeoWaveGTQueryTest.class); + + accumulo.start(); + + dataStore = new AccumuloDataStore( + new BasicAccumuloOperations( + accumulo.getZooKeepers(), + accumulo.getInstanceName(), + ACCUMULO_USER, + ACCUMULO_PASSWORD, + TABLE_NAMESPACE)); + + ingestCannedData(); + } + + private static void ingestCannedData() throws IOException { + final List<SimpleFeature> points = new ArrayList<>(); + + System.out.println("Building SimpleFeatures from canned data set..."); + + for (final Entry<String, Coordinate> entry : CANNED_DATA.entrySet()) { + System.out.println("Added point: " + entry.getKey()); + points.add(buildSimpleFeature(entry.getKey(), entry.getValue())); + } + + System.out.println("Ingesting canned data..."); + + try (final IndexWriter<SimpleFeature> indexWriter = dataStore.createWriter(ADAPTER, INDEX)) { + for (final SimpleFeature sf : points) { + indexWriter.write(sf); + } + } + + System.out.println("Ingest complete."); + } + + @Test + public void executeCQLQueryTest() throws IOException, CQLException { + System.out.println("Executing query, expecting to match two points..."); + + final Filter cqlFilter = ECQL.toFilter("BBOX(geometry,-77.6167,38.6833,-76.6,38.9200) and locationName like 'W%'"); + + final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX); + final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, ADAPTER); + + try (final CloseableIterator<SimpleFeature> iterator = dataStore.query(queryOptions, cqlQuery)) { + int count = 0; + while (iterator.hasNext()) { + System.out.println("Query match: " + iterator.next().getID()); + count++; + } + System.out.println("executeCQLQueryTest count: " + count); + // Should match "Washington Monument" and "White House" + assertEquals(2, count); + } + } + + @Test + public void executeBoundingBoxQueryTest() throws IOException { + System.out.println("Constructing bounding box for the area contained by [Baltimore, MD and Richmond, VA."); + + final Geometry boundingBox = GeometryUtils.GEOMETRY_FACTORY.toGeometry(new Envelope( + BALTIMORE, + RICHMOND)); + + System.out.println("Executing query, expecting to match ALL points..."); + + final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX); + final SpatialQuery spatialQuery = new SpatialQuery(boundingBox); + + try (final CloseableIterator<SimpleFeature> iterator = dataStore.query(queryOptions, spatialQuery)) { + int count = 0; + while (iterator.hasNext()) { + System.out.println("Query match: " + iterator.next().getID()); + count++; + } + System.out.println("executeBoundingBoxQueryTest count: " + count); + // Should match "FedEx Field", "Washington Monument", and "White House" + assertEquals(3, count); + } + } + + @Test + public void executePolygonQueryTest() throws IOException { + System.out.println("Constructing polygon for the area contained by [Baltimore, MD; Richmond, VA; Harrisonburg, VA]."); + + final Polygon polygon = GeometryUtils.GEOMETRY_FACTORY.createPolygon(new Coordinate[] { + BALTIMORE, + RICHMOND, + HARRISONBURG, + BALTIMORE + }); + + System.out.println("Executing query, expecting to match ALL points..."); + + final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX); + final SpatialQuery spatialQuery = new SpatialQuery(polygon); + + /* + * NOTICE: In this query, the adapter is added to the query options. If + * an index has data from more than one adapter, the data associated + * with a specific adapter can be selected. + */ + try (final CloseableIterator<SimpleFeature> closableIterator = dataStore.query(queryOptions, spatialQuery)) { + int count = 0; + while (closableIterator.hasNext()) { + System.out.println("Query match: " + closableIterator.next().getID()); + count++; + } + System.out.println("executePolygonQueryTest count: " + count); + // Should match "FedEx Field", "Washington Monument", and "White House" + assertEquals(3, count); + } + } + + @AfterClass + public static void cleanup() throws IOException, InterruptedException { + try { + accumulo.stop(); + } finally { + FileUtils.deleteDirectory(tempAccumuloDir); + } + } + + private static SimpleFeatureType getPointSimpleFeatureType() { + final String name = "PointSimpleFeatureType"; + final SimpleFeatureTypeBuilder sftBuilder = new SimpleFeatureTypeBuilder(); + final AttributeTypeBuilder atBuilder = new AttributeTypeBuilder(); + sftBuilder.setName(name); + sftBuilder.add(atBuilder.binding(String.class).nillable(false) + .buildDescriptor("locationName")); + sftBuilder.add(atBuilder.binding(Geometry.class).nillable(false) + .buildDescriptor("geometry")); + + return sftBuilder.buildFeatureType(); + } + + private static SimpleFeature buildSimpleFeature(final String locationName, final Coordinate coordinate) { + final SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getPointSimpleFeatureType()); + builder.set("locationName", locationName); + builder.set("geometry", GeometryUtils.GEOMETRY_FACTORY.createPoint(coordinate)); + + return builder.buildFeature(locationName); + } +} \ No newline at end of file
