RYA-241; Closes #139. Adding GeoWave indexer to rya.geoindexing. This is a configurable option that can be used in place of GeoMesa.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/fe6cbccf Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/fe6cbccf Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/fe6cbccf Branch: refs/heads/master Commit: fe6cbccf77e9ef2057ddeb62d6314969f86cda88 Parents: c898985 Author: eric.white <[email protected]> Authored: Fri Jan 27 10:46:08 2017 -0500 Committer: pujav65 <[email protected]> Committed: Mon Mar 13 10:05:54 2017 -0400 ---------------------------------------------------------------------- extras/indexing/pom.xml | 1 + extras/indexingExample/README.md | 26 + .../src/main/assembly/assembly.xml | 11 +- extras/rya.geoindexing/pom.xml | 12 + .../org/apache/rya/indexing/GeoIndexerType.java | 61 ++ .../rya/indexing/OptionalConfigUtils.java | 86 +-- .../accumulo/geo/GeoMesaGeoIndexer.java | 54 +- .../accumulo/geo/GeoWaveGeoIndexer.java | 661 +++++++++++++++++++ .../rya/indexing/GeoIndexingTestUtils.java | 51 ++ .../indexing/accumulo/geo/GeoIndexerSfTest.java | 273 ++++---- .../indexing/accumulo/geo/GeoIndexerTest.java | 273 ++++---- .../accumulo/geo/GeoWaveFeatureReaderTest.java | 385 +++++++++++ .../accumulo/geo/GeoWaveGTQueryTest.java | 254 +++++++ .../accumulo/geo/GeoWaveIndexerSfTest.java | 537 +++++++++++++++ .../accumulo/geo/GeoWaveIndexerTest.java | 448 +++++++++++++ .../indexing/mongo/MongoGeoIndexerSfTest.java | 33 +- .../rya/indexing/mongo/MongoGeoIndexerTest.java | 30 +- pom.xml | 32 +- 18 files changed, 2828 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index 237c018..e38e75d 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -167,6 +167,7 @@ <artifactSet> <excludes> <exclude>org.locationtech.geomesa:*</exclude> + <exclude>mil.nga.giat:*</exclude> <exclude>scala:*</exclude> <exclude>org.apache.accumulo:*</exclude> <exclude>org.apache.thrift:*</exclude> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/indexingExample/README.md ---------------------------------------------------------------------- diff --git a/extras/indexingExample/README.md b/extras/indexingExample/README.md new file mode 100644 index 0000000..16832ba --- /dev/null +++ b/extras/indexingExample/README.md @@ -0,0 +1,26 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +# Rya Indexing Example + +## Install Instructions + +### GeoMesa Installation +GeoMesa download and install instructions can be found [here](http://www.geomesa.org/documentation/user/accumulo/install.html). + +### GeoWave Installation +GeoWave download and install instructions can be found [here](http://ngageoint.github.io/geowave/packages.html). http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/indexingExample/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/assembly/assembly.xml b/extras/indexingExample/src/main/assembly/assembly.xml index 140945d..4534f41 100644 --- a/extras/indexingExample/src/main/assembly/assembly.xml +++ b/extras/indexingExample/src/main/assembly/assembly.xml @@ -32,7 +32,6 @@ under the License. <outputDirectory>accumulo/lib/ext</outputDirectory> <includes> <include>org.apache.rya:rya.indexing:*:accumulo-server</include> - <include>org.locationtech.geomesa:geomesa-accumulo-distributed-runtime:*</include> </includes> </dependencySet> <dependencySet> @@ -67,8 +66,8 @@ under the License. <outputDirectory>dist</outputDirectory> </file> </files> - <!-- Add Apache licenses to the distribution zip --> <fileSets> + <!-- Add Apache licenses to the distribution zip --> <fileSet> <directory>${project.basedir}/../../</directory> <outputDirectory></outputDirectory> @@ -77,5 +76,13 @@ under the License. <include>NOTICE*</include> </includes> </fileSet> + <!-- Add instructions to the distribution zip --> + <fileSet> + <directory>${project.basedir}</directory> + <outputDirectory></outputDirectory> + <includes> + <include>README.md</include> + </includes> + </fileSet> </fileSets> </assembly> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/pom.xml b/extras/rya.geoindexing/pom.xml index 4e81808..e27b0b8 100644 --- a/extras/rya.geoindexing/pom.xml +++ b/extras/rya.geoindexing/pom.xml @@ -90,6 +90,17 @@ <artifactId>geomesa-accumulo-datastore_2.11</artifactId> </dependency> + <dependency> + <groupId>mil.nga.giat</groupId> + <artifactId>geowave-datastore-accumulo</artifactId> + <version>${geowave.version}</version> + </dependency> + + <dependency> + <groupId>mil.nga.giat</groupId> + <artifactId>geowave-adapter-vector</artifactId> + <version>${geowave.version}</version> + </dependency> <dependency> <groupId>junit</groupId> @@ -184,6 +195,7 @@ <artifactSet> <excludes> <exclude>org.locationtech.geomesa:*</exclude> + <exclude>mil.nga.giat:*</exclude> <exclude>scala:*</exclude> <exclude>org.apache.accumulo:*</exclude> <exclude>org.apache.thrift:*</exclude> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoIndexerType.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoIndexerType.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoIndexerType.java new file mode 100644 index 0000000..1af51b0 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoIndexerType.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import org.apache.rya.indexing.accumulo.geo.GeoWaveGeoIndexer; +import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer; + +/** + * A list of all the types of Geo indexers supported in Rya. + */ +public enum GeoIndexerType { + /** + * Geo Mesa based indexer. + */ + GEO_MESA(GeoMesaGeoIndexer.class), + /** + * Geo Wave based indexer. + */ + GEO_WAVE(GeoWaveGeoIndexer.class), + /** + * MongoDB based indexer. + */ + MONGO_DB(MongoGeoIndexer.class); + + private Class<? extends GeoIndexer> geoIndexerClass; + + /** + * Creates a new {@link GeoIndexerType}. + * @param geoIndexerClass the {@link GeoIndexer} {@link Class}. + * (not {@code null}) + */ + private GeoIndexerType(final Class<? extends GeoIndexer> geoIndexerClass) { + this.geoIndexerClass = checkNotNull(geoIndexerClass); + } + + /** + * @return the {@link GeoIndexer} {@link Class}. (not {@code null}) + */ + public Class<? extends GeoIndexer> getGeoIndexerClass() { + return geoIndexerClass; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java index b10e824..dd6ea40 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java @@ -1,5 +1,3 @@ -package org.apache.rya.indexing; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,59 +16,28 @@ package org.apache.rya.indexing; * specific language governing permissions and limitations * under the License. */ +package org.apache.rya.indexing; -import static java.util.Objects.requireNonNull; - -import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.instance.RyaDetails; -import org.apache.rya.indexing.GeoEnabledFilterFunctionOptimizer; import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; -import org.apache.rya.indexing.accumulo.entity.EntityOptimizer; -import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer; -import org.apache.rya.indexing.accumulo.freetext.Tokenizer; import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import org.apache.rya.indexing.external.PrecomputedJoinIndexer; -import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer; -import org.apache.rya.indexing.pcj.matching.PCJOptimizer; +import org.openrdf.model.URI; + +import com.google.common.collect.Lists; /** * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. - * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. + * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. * New code must separate parameters that are set at Rya install time from that which is specific to the client. - * Also Accumulo index tables are pushed down to the implementation and not configured in conf. + * Also Accumulo index tables are pushed down to the implementation and not configured in conf. */ public class OptionalConfigUtils extends ConfigUtils { private static final Logger logger = Logger.getLogger(OptionalConfigUtils.class); @@ -86,6 +53,7 @@ public class OptionalConfigUtils extends ConfigUtils { public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; + public static final String GEO_INDEXER_TYPE = "sc.geo.geo_indexer_type"; public static Set<URI> getGeoPredicates(final Configuration conf) { return getPredicates(conf, GEO_PREDICATES_LIST); @@ -99,43 +67,59 @@ public class OptionalConfigUtils extends ConfigUtils { return conf.getBoolean(USE_GEO, false); } + /** + * Retrieves the value for the geo indexer type from the config. + * @param conf the {@link Configuration}. + * @return the {@link GeoIndexerType} found in the config or + * {@code null} if it doesn't exist. + */ + public static GeoIndexerType getGeoIndexerType(final Configuration conf) { + return conf.getEnum(GEO_INDEXER_TYPE, null); + } public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { - final List<String> indexList = Lists.newArrayList(); final List<String> optimizers = Lists.newArrayList(); boolean useFilterIndex = false; ConfigUtils.setIndexers(conf); - for (String index : conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS)){ - indexList.add(index); + for (final String index : conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS)){ + indexList.add(index); } - for (String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){ - optimizers.add(optimizer); + for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){ + optimizers.add(optimizer); } + final GeoIndexerType geoIndexerType = getGeoIndexerType(conf); + if (ConfigUtils.getUseMongo(conf)) { if (getUseGeo(conf)) { - indexList.add(MongoGeoIndexer.class.getName()); + if (geoIndexerType == null) { + // Default to MongoGeoIndexer if not specified + indexList.add(MongoGeoIndexer.class.getName()); + } else { + indexList.add(geoIndexerType.getGeoIndexerClass().getName()); + } useFilterIndex = true; } } else { if (getUseGeo(conf)) { - indexList.add(GeoMesaGeoIndexer.class.getName()); + if (geoIndexerType == null) { + // Default to GeoMesaGeoIndexer if not specified + indexList.add(GeoMesaGeoIndexer.class.getName()); + } else { + indexList.add(geoIndexerType.getGeoIndexerClass().getName()); + } useFilterIndex = true; } } if (useFilterIndex) { - optimizers.remove(FilterFunctionOptimizer.class.getName()); + optimizers.remove(FilterFunctionOptimizer.class.getName()); optimizers.add(GeoEnabledFilterFunctionOptimizer.class.getName()); } conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); - } - - - } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java index 0b7a7d0..1956355 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java @@ -62,7 +62,6 @@ import org.geotools.feature.SchemaException; import org.geotools.feature.simple.SimpleFeatureBuilder; import org.geotools.filter.text.cql2.CQLException; import org.geotools.filter.text.ecql.ECQL; -import org.locationtech.geomesa.accumulo.data.AccumuloDataStore; import org.locationtech.geomesa.accumulo.index.Constants; import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes; import org.opengis.feature.simple.SimpleFeature; @@ -133,6 +132,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd private static final String PREDICATE_ATTRIBUTE = "P"; private static final String OBJECT_ATTRIBUTE = "O"; private static final String CONTEXT_ATTRIBUTE = "C"; + private static final String GEOMETRY_ATTRIBUTE = Constants.SF_PROPERTY_GEOMETRY; private Set<URI> validPredicates; private Configuration conf; @@ -147,7 +147,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd this.conf = conf; if (!isInit) { try { - initInternal(); + initInternal(); isInit = true; } catch (final IOException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); @@ -169,9 +169,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd try { featureType = getStatementFeatureType(dataStore); - } catch (final IOException e) { - throw new IOException(e); - } catch (final SchemaException e) { + } catch (final IOException | SchemaException e) { throw new IOException(e); } @@ -222,7 +220,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd + PREDICATE_ATTRIBUTE + ":String," // + OBJECT_ATTRIBUTE + ":String," // + CONTEXT_ATTRIBUTE + ":String," // - + Constants.SF_PROPERTY_GEOMETRY + ":Geometry:srid=4326;geomesa.mixed.geometries='true'"; + + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326;geomesa.mixed.geometries='true'"; featureType = SimpleFeatureTypes.createType(FEATURE_NAME, featureSchema); dataStore.createSchema(featureType); } @@ -255,7 +253,6 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd } } - @Override public void storeStatement(final RyaStatement statement) throws IOException { storeStatements(Collections.singleton(statement)); @@ -297,7 +294,7 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd final StatementConstraints contraints) { final List<String> filterParms = new ArrayList<String>(); - filterParms.add(type + "(" + Constants.SF_PROPERTY_GEOMETRY + ", " + geometry + " )"); + filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); if (contraints.hasSubject()) { filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); @@ -342,7 +339,6 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd logger.error("Error performing query: " + filterString, e); throw new QueryEvaluationException(e); } - } return featureIterator; } @@ -495,33 +491,23 @@ public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoInd deleteStatements(Collections.singleton(statement)); } - @Override - public void init() { - // TODO Auto-generated method stub - - } - - @Override - public void setConnector(final Connector connector) { - // TODO Auto-generated method stub - - } - - @Override - public void destroy() { - // TODO Auto-generated method stub - - } + @Override + public void init() { + } - @Override - public void purge(final RdfCloudTripleStoreConfiguration configuration) { - // TODO Auto-generated method stub + @Override + public void setConnector(final Connector connector) { + } - } + @Override + public void destroy() { + } - @Override - public void dropAndDestroy() { - // TODO Auto-generated method stub + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + } - } + @Override + public void dropAndDestroy() { + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java new file mode 100644 index 0000000..520ae81 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.Md5Hash; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.geotools.data.DataStore; +import org.geotools.data.DataUtilities; +import org.geotools.data.FeatureSource; +import org.geotools.data.FeatureStore; +import org.geotools.factory.CommonFactoryFinder; +import org.geotools.factory.Hints; +import org.geotools.feature.DefaultFeatureCollection; +import org.geotools.feature.SchemaException; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; +import org.opengis.filter.FilterFactory; +import org.opengis.filter.identity.Identifier; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +import info.aduna.iteration.CloseableIteration; +import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStore; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStoreFactory; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWavePluginException; +import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery; +import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider; +import mil.nga.giat.geowave.core.store.CloseableIterator; +import mil.nga.giat.geowave.core.store.StoreFactoryFamilySpi; +import mil.nga.giat.geowave.core.store.index.PrimaryIndex; +import mil.nga.giat.geowave.core.store.memory.MemoryStoreFactoryFamily; +import mil.nga.giat.geowave.core.store.query.EverythingQuery; +import mil.nga.giat.geowave.core.store.query.QueryOptions; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloStoreFactoryFamily; + +/** + * A {@link GeoIndexer} wrapper around a GeoWave {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the + * RDF Feature Type, and interacts with the Datastore. + * <p> + * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature + * contains the standard set of GeoWave attributes (Geometry, Start Date, and End Date). The GeoWaveGeoIndexer populates the Geometry + * attribute by parsing the Well-Known Text contained in the RDF Statementâs object literal value. + * <p> + * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: + * <p> + * <table border="1"> + * <tr> + * <th>Name</th> + * <th>Symbol</th> + * <th>Type</th> + * </tr> + * <tr> + * <td>Subject Attribute</td> + * <td>S</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Predicate Attribute</td> + * <td>P</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Object Attribute</td> + * <td>O</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Context Attribute</td> + * <td>C</td> + * <td>String</td> + * </tr> + * </table> + */ +public class GeoWaveGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { + + private static final String TABLE_SUFFIX = "geo"; + + private static final Logger logger = Logger.getLogger(GeoWaveGeoIndexer.class); + + private static final String FEATURE_NAME = "RDF"; + + private static final String SUBJECT_ATTRIBUTE = "S"; + private static final String PREDICATE_ATTRIBUTE = "P"; + private static final String OBJECT_ATTRIBUTE = "O"; + private static final String CONTEXT_ATTRIBUTE = "C"; + private static final String GEO_ID_ATTRIBUTE = "geo_id"; + private static final String GEOMETRY_ATTRIBUTE = "geowave_index_geometry"; + + private Set<URI> validPredicates; + private Configuration conf; + private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; + private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; + private SimpleFeatureType featureType; + private FeatureDataAdapter featureDataAdapter; + private DataStore geoToolsDataStore; + private mil.nga.giat.geowave.core.store.DataStore geoWaveDataStore; + private final PrimaryIndex index = new SpatialDimensionalityTypeProvider().createPrimaryIndex(); + private boolean isInit = false; + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * @return the internal GeoTools{@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public DataStore getGeoToolsDataStore() { + return geoToolsDataStore; + } + + /** + * @return the internal GeoWave {@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public mil.nga.giat.geowave.core.store.DataStore getGeoWaveDataStore() { + return geoWaveDataStore; + } + + private void initInternal() throws IOException { + validPredicates = ConfigUtils.getGeoPredicates(conf); + + try { + geoToolsDataStore = createDataStore(conf); + geoWaveDataStore = ((GeoWaveGTDataStore) geoToolsDataStore).getDataStore(); + } catch (final GeoWavePluginException e) { + logger.error("Failed to create GeoWave data store", e); + } + + try { + featureType = getStatementFeatureType(geoToolsDataStore); + } catch (final IOException | SchemaException e) { + throw new IOException(e); + } + + featureDataAdapter = new FeatureDataAdapter(featureType); + + featureSource = geoToolsDataStore.getFeatureSource(featureType.getName()); + if (!(featureSource instanceof FeatureStore)) { + throw new IllegalStateException("Could not retrieve feature store"); + } + featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; + } + + public Map<String, Serializable> getParams(final Configuration conf) { + // get the configuration parameters + final Instance instance = ConfigUtils.getInstance(conf); + final String instanceId = instance.getInstanceName(); + final String zookeepers = instance.getZooKeepers(); + final String user = ConfigUtils.getUsername(conf); + final String password = ConfigUtils.getPassword(conf); + final String auths = ConfigUtils.getAuthorizations(conf).toString(); + final String tableName = getTableName(conf); + final String tablePrefix = ConfigUtils.getTablePrefix(conf); + + final Map<String, Serializable> params = new HashMap<>(); + params.put("zookeeper", zookeepers); + params.put("instance", instanceId); + params.put("user", user); + params.put("password", password); + params.put("namespace", tableName); + params.put("gwNamespace", tablePrefix + getClass().getSimpleName()); + + params.put("Lock Management", LockManagementType.MEMORY.toString()); + params.put("Authorization Management Provider", AuthorizationManagementProviderType.EMPTY.toString()); + params.put("Authorization Data URL", null); + params.put("Transaction Buffer Size", 10000); + params.put("Query Index Strategy", QueryIndexStrategyType.HEURISTIC_MATCH.toString()); + return params; + } + + /** + * Creates the {@link DataStore} for the {@link GeoWaveGeoIndexer}. + * @param conf the {@link Configuration}. + * @return the {@link DataStore}. + */ + public DataStore createDataStore(final Configuration conf) throws IOException, GeoWavePluginException { + final Map<String, Serializable> params = getParams(conf); + final Instance instance = ConfigUtils.getInstance(conf); + final boolean useMock = instance instanceof MockInstance; + + final StoreFactoryFamilySpi storeFactoryFamily; + if (useMock) { + storeFactoryFamily = new MemoryStoreFactoryFamily(); + } else { + storeFactoryFamily = new AccumuloStoreFactoryFamily(); + } + + final GeoWaveGTDataStoreFactory geoWaveGTDataStoreFactory = new GeoWaveGTDataStoreFactory(storeFactoryFamily); + final DataStore dataStore = geoWaveGTDataStoreFactory.createNewDataStore(params); + + return dataStore; + } + + private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { + SimpleFeatureType featureType; + + final String[] datastoreFeatures = dataStore.getTypeNames(); + if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { + featureType = dataStore.getSchema(FEATURE_NAME); + } else { + featureType = DataUtilities.createType(FEATURE_NAME, + SUBJECT_ATTRIBUTE + ":String," + + PREDICATE_ATTRIBUTE + ":String," + + OBJECT_ATTRIBUTE + ":String," + + CONTEXT_ATTRIBUTE + ":String," + + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326," + + GEO_ID_ATTRIBUTE + ":String"); + + dataStore.createSchema(featureType); + } + return featureType; + } + + @Override + public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // write this feature collection to the store + if (!featureCollection.isEmpty()) { + featureStore.addFeatures(featureCollection); + } + } + + @Override + public void storeStatement(final RyaStatement statement) throws IOException { + storeStatements(Collections.singleton(statement)); + } + + private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { + final String subject = StatementSerializer.writeSubject(statement); + final String predicate = StatementSerializer.writePredicate(statement); + final String object = StatementSerializer.writeObject(statement); + final String context = StatementSerializer.writeContext(statement); + + // create the feature + final Object[] noValues = {}; + + // create the hash + final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); + final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); + + // write the statement data to the fields + final Geometry geom = GeoParseUtils.getGeometry(statement); + if(geom == null || geom.isEmpty() || !geom.isValid()) { + throw new ParseException("Could not create geometry for statement " + statement); + } + newFeature.setDefaultGeometry(geom); + + newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject); + newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate); + newFeature.setAttribute(OBJECT_ATTRIBUTE, object); + newFeature.setAttribute(CONTEXT_ATTRIBUTE, context); + // GeoWave does not support querying based on a user generated feature ID + // So, we create a separate ID attribute that it can query on. + newFeature.setAttribute(GEO_ID_ATTRIBUTE, statementId); + + // preserve the ID that we created for this feature + // (set the hint to FALSE to have GeoTools generate IDs) + newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE); + + return newFeature; + } + + private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, + final StatementConstraints contraints) { + final List<String> filterParms = new ArrayList<String>(); + + filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); + + if (contraints.hasSubject()) { + filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); + } + if (contraints.hasContext()) { + filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); + } + if (contraints.hasPredicates()) { + final List<String> predicates = new ArrayList<String>(); + for (final URI u : contraints.getPredicates()) { + predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); + } + filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); + } + + final String filterString = StringUtils.join(filterParms, " AND "); + logger.info("Performing geowave query : " + filterString); + + return getIteratorWrapper(filterString); + } + + private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) { + + return new CloseableIteration<Statement, QueryEvaluationException>() { + + private CloseableIterator<SimpleFeature> featureIterator = null; + + CloseableIterator<SimpleFeature> getIterator() throws QueryEvaluationException { + if (featureIterator == null) { + Filter cqlFilter; + try { + cqlFilter = ECQL.toFilter(filterString); + } catch (final CQLException e) { + logger.error("Error parsing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + + final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, featureDataAdapter); + final QueryOptions queryOptions = new QueryOptions(featureDataAdapter, index); + + try { + featureIterator = geoWaveDataStore.query(queryOptions, cqlQuery); + } catch (final Exception e) { + logger.error("Error performing query: " + filterString, e); + throw new QueryEvaluationException(e); + } + } + return featureIterator; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return getIterator().hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + final SimpleFeature feature = getIterator().next(); + final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); + final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); + final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); + final Object context = feature.getAttribute(CONTEXT_ATTRIBUTE); + final String contextString = context != null ? context.toString() : ""; + final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); + return statement; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + try { + getIterator().close(); + } catch (final IOException e) { + throw new QueryEvaluationException(e); + } + } + }; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { + return performQuery("EQUALS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { + return performQuery("DISJOINT", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { + return performQuery("INTERSECTS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { + return performQuery("TOUCHES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { + return performQuery("CROSSES", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { + return performQuery("WITHIN", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { + return performQuery("CONTAINS", query, contraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { + return performQuery("OVERLAPS", query, contraints); + } + + @Override + public Set<URI> getIndexablePredicates() { + return validPredicates; + } + + @Override + public void flush() throws IOException { + // TODO cache and flush features instead of writing them one at a time + } + + @Override + public void close() throws IOException { + flush(); + } + + + @Override + public String getTableName() { + return getTableName(conf); + } + + /** + * Get the Accumulo table that will be used by this index. + * @param conf + * @return table name guaranteed to be used by instances of this index + */ + public static String getTableName(final Configuration conf) { + return makeTableName( ConfigUtils.getTablePrefix(conf) ); + } + + /** + * Make the Accumulo table name used by this indexer for a specific instance of Rya. + * + * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null) + * @return The Accumulo table name used by this indexer for a specific instance of Rya. + */ + public static String makeTableName(final String ryaInstanceName) { + requireNonNull(ryaInstanceName); + return ryaInstanceName + TABLE_SUFFIX; + } + + private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { + // create a feature collection + final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); + + for (final RyaStatement ryaStatement : ryaStatements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + try { + final SimpleFeature feature = createFeature(featureType, statement); + featureCollection.add(feature); + } catch (final ParseException e) { + logger.warn("Error getting geo from statement: " + statement.toString(), e); + } + } + } + + // remove this feature collection from the store + if (!featureCollection.isEmpty()) { + final Set<Identifier> featureIds = new HashSet<Identifier>(); + final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); + final Set<String> stringIds = DataUtilities.fidSet(featureCollection); + for (final String id : stringIds) { + featureIds.add(filterFactory.featureId(id)); + } + + final String filterString = stringIds.stream().collect(Collectors.joining("','", "'", "'")); + + Filter filter = null; + try { + filter = ECQL.toFilter(GEO_ID_ATTRIBUTE + " IN (" + filterString + ")", filterFactory); + } catch (final CQLException e) { + logger.error("Unable to generate filter for deleting the statement.", e); + } + + featureStore.removeFeatures(filter); + } + } + + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + deleteStatements(Collections.singleton(statement)); + } + + @Override + public void init() { + } + + @Override + public void setConnector(final Connector connector) { + } + + @Override + public void destroy() { + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + // delete existing data + geoWaveDataStore.delete(new QueryOptions(), new EverythingQuery()); + } + + @Override + public void dropAndDestroy() { + } + + /** + * The list of supported Geo Wave {@code LockingManagementFactory} types. + */ + private static enum LockManagementType { + MEMORY("memory"); + + private final String name; + + /** + * Creates a new {@link LockManagementType}. + * @param name the name of the type. (not {@code null}) + */ + private LockManagementType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } + + /** + * The list of supported Geo Wave {@code AuthorizationFactorySPI } types. + */ + private static enum AuthorizationManagementProviderType { + EMPTY("empty"), + JSON_FILE("jsonFile"); + + private final String name; + + /** + * Creates a new {@link AuthorizationManagementProviderType}. + * @param name the name of the type. (not {@code null}) + */ + private AuthorizationManagementProviderType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } + + /** + * The list of supported Geo Wave {@code IndexQueryStrategySPI} types. + */ + private static enum QueryIndexStrategyType { + BEST_MATCH("Best Match"), + HEURISTIC_MATCH("Heuristic Match"), + PRESERVE_LOCALITY("Preserve Locality"); + + private final String name; + + /** + * Creates a new {@link QueryIndexStrategyType}. + * @param name the name of the type. (not {@code null}) + */ + private QueryIndexStrategyType(final String name) { + this.name = checkNotNull(name); + } + + @Override + public String toString() { + return name; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/GeoIndexingTestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/GeoIndexingTestUtils.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/GeoIndexingTestUtils.java new file mode 100644 index 0000000..b0c636d --- /dev/null +++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/GeoIndexingTestUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing; + +import java.util.HashSet; +import java.util.Set; + +import info.aduna.iteration.CloseableIteration; + +/** + * Utility methods to help test geo indexing methods. + */ +public final class GeoIndexingTestUtils { + /** + * Private constructor to prevent instantiation. + */ + private GeoIndexingTestUtils () { + } + + /** + * Generates a set of items from the specified iterator. + * @param iter a {@link CloseableIteration}. + * @return the {@link Set} of items from the iterator or an empty set if + * none were found. + * @throws Exception + */ + public static <X> Set<X> getSet(final CloseableIteration<X, ?> iter) throws Exception { + final Set<X> set = new HashSet<X>(); + while (iter.hasNext()) { + final X item = iter.next(); + set.add(item); + } + return set; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fe6cbccf/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoIndexerSfTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoIndexerSfTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoIndexerSfTest.java index 4b3af75..e61ef35 100644 --- a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoIndexerSfTest.java +++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoIndexerSfTest.java @@ -1,5 +1,3 @@ -package org.apache.rya.indexing.accumulo.geo; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +6,9 @@ package org.apache.rya.indexing.accumulo.geo; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,15 +16,27 @@ package org.apache.rya.indexing.accumulo.geo; * specific language governing permissions and limitations * under the License. */ +package org.apache.rya.indexing.accumulo.geo; + +import static org.apache.rya.indexing.GeoIndexingTestUtils.getSet; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.GeoIndexerType; +import org.apache.rya.indexing.OptionalConfigUtils; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.accumulo.ConfigUtils; import org.geotools.geometry.jts.Geometries; import org.junit.Assert; import org.junit.Before; @@ -44,7 +54,7 @@ import org.openrdf.model.impl.StatementImpl; import org.openrdf.model.impl.URIImpl; import org.openrdf.model.impl.ValueFactoryImpl; -import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.vividsolutions.jts.geom.Coordinate; import com.vividsolutions.jts.geom.Geometry; @@ -59,13 +69,6 @@ import com.vividsolutions.jts.io.ParseException; import com.vividsolutions.jts.io.gml2.GMLWriter; import info.aduna.iteration.CloseableIteration; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.resolver.RdfToRyaConversions; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.GeoConstants; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.accumulo.ConfigUtils; /** * Tests all of the "simple functions" of the geoindexer specific to GML. @@ -82,16 +85,16 @@ public class GeoIndexerSfTest { // Here is the landscape: /** * <pre> - * 2---+---+---+---+---+---+ - * | F |G | - * 1 A o(-1,1) o C | - * | | | - * 0---+---+ +---+---+(3,0) - * | | E | - * -1 B + .---+---+ - * | | /| | | - * -2---+---+-/-+---+ + - * ^ / | D | + * 2---+---+---+---+---+---+ + * | F |G | + * 1 A o(-1,1) o C | + * | | | + * 0---+---+ +---+---+(3,0) + * | | E | + * -1 B + .---+---+ + * | | /| | | + * -2---+---+-/-+---+ + + * ^ / | D | * -3 -2 -1 0---1---2 3 4 * </pre> **/ @@ -105,40 +108,42 @@ public class GeoIndexerSfTest { private static final LineString E = line(-1, -3, 0, -1); - private static final Map<Geometry, String> names = Maps.newHashMap(); - static { - names.put(A, "A"); - names.put(B, "B"); - names.put(C, "C"); - names.put(D, "D"); - names.put(E, "E"); - names.put(F, "F"); - names.put(G, "G"); - } + private static final Map<Geometry, String> NAMES = ImmutableMap.<Geometry, String>builder() + .put(A, "A") + .put(B, "B") + .put(C, "C") + .put(D, "D") + .put(E, "E") + .put(F, "F") + .put(G, "G") + .build(); /** - * JUnit 4 parameterized iterates thru this list and calls the constructor with each. - * For each test, Call the constructor three times, for WKT and for GML encoding 1, and GML encoding 2 - * @return + * JUnit 4 parameterized iterates thru this list and calls the constructor with each. + * For each test, Call the constructor three times, for WKT and for GML encoding 1, and GML encoding 2 */ - final static URI useJtsLibEncoding = new URIImpl("uri:useLib") ; - final static URI useRoughEncoding = new URIImpl("uri:useRough") ; - + private static final URI USE_JTS_LIB_ENCODING = new URIImpl("uri:useLib") ; + private static final URI USE_ROUGH_ENCODING = new URIImpl("uri:useRough") ; + @Parameters public static Collection<URI[]> constructorData() { - URI[][] data = new URI[][] { { GeoConstants.XMLSCHEMA_OGC_WKT,useJtsLibEncoding }, { GeoConstants.XMLSCHEMA_OGC_GML,useJtsLibEncoding } , { GeoConstants.XMLSCHEMA_OGC_GML,useRoughEncoding } }; + final URI[][] data = new URI[][] { { GeoConstants.XMLSCHEMA_OGC_WKT, USE_JTS_LIB_ENCODING }, { GeoConstants.XMLSCHEMA_OGC_GML, USE_JTS_LIB_ENCODING }, { GeoConstants.XMLSCHEMA_OGC_GML, USE_ROUGH_ENCODING } }; return Arrays.asList(data); } - private URI schemaToTest; - private URI encodeMethod; + private final URI schemaToTest; + private final URI encodeMethod; + /** - * Constructor required by JUnit parameterized runner. See data() for constructor values. + * Constructor required by JUnit parameterized runner. See {@link #constructorData()} for constructor values. + * @param schemaToTest the schema to test {@link URI}. + * @param encodeMethod the encode method {@link URI}. */ - public GeoIndexerSfTest(URI schemaToTest, URI encodeMethod) { - this.schemaToTest=schemaToTest; + public GeoIndexerSfTest(final URI schemaToTest, final URI encodeMethod) { + this.schemaToTest = schemaToTest; this.encodeMethod = encodeMethod; } + /** * Run before each test method. * @throws Exception @@ -147,40 +152,45 @@ public class GeoIndexerSfTest { public void before() throws Exception { conf = new AccumuloRdfConfiguration(); conf.setTablePrefix("triplestore_"); - String tableName = GeoMesaGeoIndexer.getTableName(conf); + final String tableName = GeoMesaGeoIndexer.getTableName(conf); conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); conf.set(ConfigUtils.CLOUDBASE_USER, "USERNAME"); conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "PASS"); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "INSTANCE"); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, "localhost"); conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U"); + conf.set(OptionalConfigUtils.USE_GEO, "true"); + conf.set(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_MESA.toString()); - TableOperations tops = ConfigUtils.getConnector(conf).tableOperations(); + final TableOperations tops = ConfigUtils.getConnector(conf).tableOperations(); // get all of the table names with the prefix - Set<String> toDel = Sets.newHashSet(); - for (String t : tops.list()) { + final Set<String> toDel = Sets.newHashSet(); + for (final String t : tops.list()) { if (t.startsWith(tableName)) { toDel.add(t); } } - for (String t : toDel) { + for (final String t : toDel) { tops.delete(t); } g = new GeoMesaGeoIndexer(); g.setConf(conf); // Convert the statements as schema WKT or GML, then GML has two methods to encode. - g.storeStatement(RyaStatement(A,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(B,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(C,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(D,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(F,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(E,schemaToTest, encodeMethod)); - g.storeStatement(RyaStatement(G,schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(A, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(B, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(C, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(D, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(F, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(E, schemaToTest, encodeMethod)); + g.storeStatement(createRyaStatement(G, schemaToTest, encodeMethod)); } - private static RyaStatement RyaStatement(Geometry geo, URI schema, URI encodingMethod) { + private static RyaStatement createRyaStatement(final Geometry geo, final URI schema, final URI encodingMethod) { return RdfToRyaConversions.convertStatement(genericStatement(geo,schema,encodingMethod)); } - private static Statement genericStatement(Geometry geo, URI schema, URI encodingMethod) { + + private static Statement genericStatement(final Geometry geo, final URI schema, final URI encodingMethod) { if (schema.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) { return genericStatementWkt(geo); } else if (schema.equals(GeoConstants.XMLSCHEMA_OGC_GML)) { @@ -188,31 +198,34 @@ public class GeoIndexerSfTest { } throw new Error("schema unsupported: "+schema); } - private static Statement genericStatementWkt(Geometry geo) { - ValueFactory vf = new ValueFactoryImpl(); - Resource subject = vf.createURI("uri:" + names.get(geo)); - URI predicate = GeoConstants.GEO_AS_WKT; - Value object = vf.createLiteral(geo.toString(), GeoConstants.XMLSCHEMA_OGC_WKT); + + private static Statement genericStatementWkt(final Geometry geo) { + final ValueFactory vf = new ValueFactoryImpl(); + final Resource subject = vf.createURI("uri:" + NAMES.get(geo)); + final URI predicate = GeoConstants.GEO_AS_WKT; + final Value object = vf.createLiteral(geo.toString(), GeoConstants.XMLSCHEMA_OGC_WKT); return new StatementImpl(subject, predicate, object); } - private static Statement genericStatementGml(Geometry geo, URI encodingMethod) { - ValueFactory vf = new ValueFactoryImpl(); - Resource subject = vf.createURI("uri:" + names.get(geo)); - URI predicate = GeoConstants.GEO_AS_GML; - + private static Statement genericStatementGml(final Geometry geo, final URI encodingMethod) { + final ValueFactory vf = new ValueFactoryImpl(); + final Resource subject = vf.createURI("uri:" + NAMES.get(geo)); + final URI predicate = GeoConstants.GEO_AS_GML; + final String gml ; - if (encodingMethod==useJtsLibEncoding) + if (encodingMethod == USE_JTS_LIB_ENCODING) { gml = geoToGmlUseJtsLib(geo); - else if (encodingMethod==useRoughEncoding) + } else if (encodingMethod == USE_ROUGH_ENCODING) { gml = geoToGmlRough(geo); - else + } + else { throw new Error("invalid encoding method: "+encodingMethod); // System.out.println("===created GML===="); // System.out.println(gml); // System.out.println("========== GML===="); + } - Value object = vf.createLiteral(gml, GeoConstants.XMLSCHEMA_OGC_GML); + final Value object = vf.createLiteral(gml, GeoConstants.XMLSCHEMA_OGC_GML); return new StatementImpl(subject, predicate, object); } @@ -221,17 +234,17 @@ public class GeoIndexerSfTest { * @param geo base Geometry gets delegated * @return String gml encoding of the geomoetry */ - private static String geoToGmlUseJtsLib(Geometry geo) { - int srid = geo.getSRID(); - GMLWriter gmlWriter = new GMLWriter(); + private static String geoToGmlUseJtsLib(final Geometry geo) { + final int srid = geo.getSRID(); + final GMLWriter gmlWriter = new GMLWriter(); gmlWriter.setNamespace(false); gmlWriter.setPrefix(null); - + if (srid != -1 || srid != 0) { gmlWriter.setSrsName("EPSG:" + geo.getSRID()); } - String gml = gmlWriter.write(geo); - // Hack to replace a gml 2.0 deprecated element in the Polygon. + final String gml = gmlWriter.write(geo); + // Hack to replace a gml 2.0 deprecated element in the Polygon. // It should tolerate this as it does other depreciated elements like <gml:coordinates>. return gml.replace("outerBoundaryIs", "exterior"); } @@ -241,70 +254,72 @@ public class GeoIndexerSfTest { * @param geo base Geometry gets delegated * @return String gml encoding of the gemoetry */ - private static String geoToGmlRough(Geometry geo) { - final Geometries theType = org.geotools.geometry.jts.Geometries.get(geo); - switch (theType) { - case POINT: - return geoToGml((Point)geo); - case LINESTRING: - return geoToGml((LineString)geo); - case POLYGON: - return geoToGml((Polygon)geo); - case MULTIPOINT: - case MULTILINESTRING: - case MULTIPOLYGON: - default: - throw new Error("No code to convert to GML for this type: "+theType); - } + private static String geoToGmlRough(final Geometry geo) { + final Geometries theType = org.geotools.geometry.jts.Geometries.get(geo); + switch (theType) { + case POINT: + return geoToGml((Point)geo); + case LINESTRING: + return geoToGml((LineString)geo); + case POLYGON: + return geoToGml((Polygon)geo); + case MULTIPOINT: + case MULTILINESTRING: + case MULTIPOLYGON: + default: + throw new Error("No code to convert to GML for this type: "+theType); } + } - private static Point point(double x, double y) { + private static Point point(final double x, final double y) { return gf.createPoint(new Coordinate(x, y)); } - private static String geoToGml(Point point) { + private static String geoToGml(final Point point) { //CRS:84 long X,lat Y //ESPG:4326 lat Y,long X return "<Point"// - + " srsName='CRS:84'"// TODO: point.getSRID() - + "><pos>"+point.getX()+" "+point.getY()+"</pos> "// assumes Y=lat X=long + + " srsName='CRS:84'"// TODO: point.getSRID() + + "><pos>"+point.getX()+" "+point.getY()+"</pos> "// assumes Y=lat X=long + " </Point>"; } - private static LineString line(double x1, double y1, double x2, double y2) { + private static LineString line(final double x1, final double y1, final double x2, final double y2) { return new LineString(new PackedCoordinateSequence.Double(new double[] { x1, y1, x2, y2 }, 2), gf); } - /** + + /** * convert a lineString geometry to GML * @param line * @return String that is XML that is a GMLLiteral of line */ - private static String geoToGml(LineString line) { - StringBuilder coordString = new StringBuilder() ; - for (Coordinate coor : line.getCoordinates()) { + private static String geoToGml(final LineString line) { + final StringBuilder coordString = new StringBuilder() ; + for (final Coordinate coor : line.getCoordinates()) { coordString.append(" ").append(coor.x).append(" ").append(coor.y); //ESPG:4326 lat/long } - return " <gml:LineString srsName=\"http://www.opengis.net/def/crs/EPSG/0/4326\" xmlns:gml='http://www.opengis.net/gml'>\n" + return " <gml:LineString srsName=\"http://www.opengis.net/def/crs/EPSG/0/4326\" xmlns:gml='http://www.opengis.net/gml'>\n" + "<gml:posList srsDimension=\"2\">"// + coordString // + "</gml:posList></gml:LineString >"; } - private static Polygon poly(double[] arr) { - LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2)); - Polygon p1 = gf.createPolygon(r1, new LinearRing[] {}); + private static Polygon poly(final double[] arr) { + final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2)); + final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {}); return p1; } - /** + + /** * convert a Polygon geometry to GML * @param geometry * @return String that is XML that is a GMLLiteral of line */ - private static String geoToGml(Polygon poly) { - StringBuilder coordString = new StringBuilder() ; - for (Coordinate coor : poly.getCoordinates()) { + private static String geoToGml(final Polygon poly) { + final StringBuilder coordString = new StringBuilder() ; + for (final Coordinate coor : poly.getCoordinates()) { coordString.append(" ").append(coor.x).append(" ").append(coor.y); //ESPG:4326 lat/long - //with commas: coordString.append(" ").append(coor.x).append(",").append(coor.y); + //with commas: coordString.append(" ").append(coor.x).append(",").append(coor.y); } return "<gml:Polygon srsName=\"EPSG:4326\" xmlns:gml='http://www.opengis.net/gml'>\r\n"// + "<gml:exterior><gml:LinearRing>\r\n"// @@ -314,28 +329,20 @@ public class GeoIndexerSfTest { + "</gml:LinearRing></gml:exterior>\r\n</gml:Polygon>\r\n"; } - private static double[] bbox(double x1, double y1, double x2, double y2) { + private static double[] bbox(final double x1, final double y1, final double x2, final double y2) { return new double[] { x1, y1, x1, y2, x2, y2, x2, y1, x1, y1 }; } - public void compare(CloseableIteration<Statement, ?> actual, Geometry... expected) throws Exception { - Set<Statement> expectedSet = Sets.newHashSet(); - for (Geometry geo : expected) { - expectedSet.add(RyaToRdfConversions.convertStatement(RyaStatement(geo,this.schemaToTest, encodeMethod))); + private void compare(final CloseableIteration<Statement, ?> actual, final Geometry... expected) throws Exception { + final Set<Statement> expectedSet = Sets.newHashSet(); + for (final Geometry geo : expected) { + expectedSet.add(RyaToRdfConversions.convertStatement(createRyaStatement(geo, this.schemaToTest, encodeMethod))); } Assert.assertEquals(expectedSet, getSet(actual)); } - private static <X> Set<X> getSet(CloseableIteration<X, ?> iter) throws Exception { - Set<X> set = new HashSet<X>(); - while (iter.hasNext()) { - set.add(iter.next()); - } - return set; - } - - private static Geometry[] EMPTY_RESULTS = {}; + private static final Geometry[] EMPTY_RESULTS = {}; @Test public void testParsePoly() throws Exception { @@ -354,13 +361,14 @@ public class GeoIndexerSfTest { /** * Convert Geometry to Wkt|GML (schemaToTest), parse to Geometry, and compare to original. + * @param originalGeom the original {@link Geometry}. * @throws ParseException */ - public void assertParseable(Geometry originalGeom) throws ParseException { - Geometry parsedGeom = GeoParseUtils.getGeometry(genericStatement(originalGeom,schemaToTest, encodeMethod)); + public void assertParseable(final Geometry originalGeom) throws ParseException { + final Geometry parsedGeom = GeoParseUtils.getGeometry(genericStatement(originalGeom,schemaToTest, encodeMethod)); assertTrue("Parsed should equal original: "+originalGeom+" parsed: "+parsedGeom, originalGeom.equalsNorm(parsedGeom)); - // assertEquals( originalGeom, parsedGeom ); //also passes - // assertTrue( originalGeom.equalsExact(parsedGeom) ); //also passes + assertEquals( originalGeom, parsedGeom ); //also passes + assertTrue( originalGeom.equalsExact(parsedGeom) ); //also passes } @Test @@ -376,7 +384,6 @@ public class GeoIndexerSfTest { // poly compare(g.queryEquals(A, EMPTY_CONSTRAINTS), A); compare(g.queryEquals(poly(bbox(-2, -2, 1, 2)), EMPTY_CONSTRAINTS), EMPTY_RESULTS); - } @Test @@ -399,14 +406,14 @@ public class GeoIndexerSfTest { // scala.MatchError: POINT (2 4) (of class com.vividsolutions.jts.geom.Point) // at org.locationtech.geomesa.filter.FilterHelper$.updateToIDLSafeFilter(FilterHelper.scala:53) // compare(g.queryIntersects(F, EMPTY_CONSTRAINTS), A, F); - // compare(g.queryIntersects(F, EMPTY_CONSTRAINTS), EMPTY_RESULTS); + // compare(g.queryIntersects(F, EMPTY_CONSTRAINTS), EMPTY_RESULTS); } @Ignore @Test public void testIntersectsLine() throws Exception { // This seems like a bug - // fails with: + // fails with: // scala.MatchError: LINESTRING (2 0, 3 3) (of class com.vividsolutions.jts.geom.LineString) // at org.locationtech.geomesa.filter.FilterHelper$.updateToIDLSafeFilter(FilterHelper.scala:53) //compare(g.queryIntersects(E, EMPTY_CONSTRAINTS), A, E, D);
