http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java deleted file mode 100644 index 12a84fd..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.accumulo.geo; - -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.GeoIndexer; -import org.apache.rya.indexing.Md5Hash; -import org.apache.rya.indexing.OptionalConfigUtils; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.StatementSerializer; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; -import org.geotools.data.DataStore; -import org.geotools.data.DataStoreFinder; -import org.geotools.data.DataUtilities; -import org.geotools.data.FeatureSource; -import org.geotools.data.FeatureStore; -import org.geotools.data.Query; -import org.geotools.factory.CommonFactoryFinder; -import org.geotools.factory.Hints; -import org.geotools.feature.DefaultFeatureCollection; -import org.geotools.feature.FeatureIterator; -import org.geotools.feature.SchemaException; -import org.geotools.feature.simple.SimpleFeatureBuilder; -import org.geotools.filter.text.cql2.CQLException; -import org.geotools.filter.text.ecql.ECQL; -import org.locationtech.geomesa.accumulo.index.Constants; -import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes; -import org.opengis.feature.simple.SimpleFeature; -import org.opengis.feature.simple.SimpleFeatureType; -import org.opengis.filter.Filter; -import org.opengis.filter.FilterFactory; -import org.opengis.filter.identity.Identifier; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; - -import info.aduna.iteration.CloseableIteration; - -/** - * A {@link GeoIndexer} wrapper around a GeoMesa {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the - * RDF Feature Type, and interacts with the Datastore. - * <p> - * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature - * contains the standard set of GeoMesa attributes (Geometry, Start Date, and End Date). The GeoMesaGeoIndexer populates the Geometry - * attribute by parsing the Well-Known Text contained in the RDF Statementâs object literal value. - * <p> - * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: - * <p> - * <table border="1"> - * <tr> - * <th>Name</th> - * <th>Symbol</th> - * <th>Type</th> - * </tr> - * <tr> - * <td>Subject Attribute</td> - * <td>S</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Predicate Attribute</td> - * <td>P</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Object Attribute</td> - * <td>O</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Context Attribute</td> - * <td>C</td> - * <td>String</td> - * </tr> - * </table> - */ -public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { - - private static final String TABLE_SUFFIX = "geo"; - - private static final Logger logger = Logger.getLogger(GeoMesaGeoIndexer.class); - - private static final String FEATURE_NAME = "RDF"; - - private static final String SUBJECT_ATTRIBUTE = "S"; - private static final String PREDICATE_ATTRIBUTE = "P"; - private static final String OBJECT_ATTRIBUTE = "O"; - private static final String CONTEXT_ATTRIBUTE = "C"; - private static final String GEOMETRY_ATTRIBUTE = Constants.SF_PROPERTY_GEOMETRY; - - private Set<URI> validPredicates; - private Configuration conf; - private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; - private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; - private SimpleFeatureType featureType; - private boolean isInit = false; - - //initialization occurs in setConf because index is created using reflection - @Override - public void setConf(final Configuration conf) { - this.conf = conf; - if (!isInit) { - try { - initInternal(); - isInit = true; - } catch (final IOException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } - } - - @Override - public Configuration getConf() { - return conf; - } - - - private void initInternal() throws IOException { - validPredicates = ConfigUtils.getGeoPredicates(conf); - - final DataStore dataStore = createDataStore(conf); - - try { - featureType = getStatementFeatureType(dataStore); - } catch (final IOException | SchemaException e) { - throw new IOException(e); - } - - featureSource = dataStore.getFeatureSource(featureType.getName()); - if (!(featureSource instanceof FeatureStore)) { - throw new IllegalStateException("Could not retrieve feature store"); - } - featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; - } - - private static DataStore createDataStore(final Configuration conf) throws IOException { - // get the configuration parameters - final Instance instance = ConfigUtils.getInstance(conf); - final boolean useMock = instance instanceof MockInstance; - final String instanceId = instance.getInstanceName(); - final String zookeepers = instance.getZooKeepers(); - final String user = ConfigUtils.getUsername(conf); - final String password = ConfigUtils.getPassword(conf); - final String auths = ConfigUtils.getAuthorizations(conf).toString(); - final String tableName = getTableName(conf); - final int numParitions = OptionalConfigUtils.getGeoNumPartitions(conf); - - final String featureSchemaFormat = "%~#s%" + numParitions + "#r%" + FEATURE_NAME - + "#cstr%0,3#gh%yyyyMMdd#d::%~#s%3,2#gh::%~#s%#id"; - // build the map of parameters - final Map<String, Serializable> params = new HashMap<String, Serializable>(); - params.put("instanceId", instanceId); - params.put("zookeepers", zookeepers); - params.put("user", user); - params.put("password", password); - params.put("auths", auths); - params.put("tableName", tableName); - params.put("indexSchemaFormat", featureSchemaFormat); - params.put("useMock", Boolean.toString(useMock)); - - // fetch the data store from the finder - return DataStoreFinder.getDataStore(params); - } - - private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { - SimpleFeatureType featureType; - - final String[] datastoreFeatures = dataStore.getTypeNames(); - if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { - featureType = dataStore.getSchema(FEATURE_NAME); - } else { - final String featureSchema = SUBJECT_ATTRIBUTE + ":String," // - + PREDICATE_ATTRIBUTE + ":String," // - + OBJECT_ATTRIBUTE + ":String," // - + CONTEXT_ATTRIBUTE + ":String," // - + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326;geomesa.mixed.geometries='true'"; - featureType = SimpleFeatureTypes.createType(FEATURE_NAME, featureSchema); - dataStore.createSchema(featureType); - } - return featureType; - } - - @Override - public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { - // create a feature collection - final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - for (final RyaStatement ryaStatement : ryaStatements) { - final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); - // if the predicate list is empty, accept all predicates. - // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - try { - final SimpleFeature feature = createFeature(featureType, statement); - featureCollection.add(feature); - } catch (final ParseException e) { - logger.warn("Error getting geo from statement: " + statement.toString(), e); - } - } - } - - // write this feature collection to the store - if (!featureCollection.isEmpty()) { - featureStore.addFeatures(featureCollection); - } - } - - @Override - public void storeStatement(final RyaStatement statement) throws IOException { - storeStatements(Collections.singleton(statement)); - } - - private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { - final String subject = StatementSerializer.writeSubject(statement); - final String predicate = StatementSerializer.writePredicate(statement); - final String object = StatementSerializer.writeObject(statement); - final String context = StatementSerializer.writeContext(statement); - - // create the feature - final Object[] noValues = {}; - - // create the hash - final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); - final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); - - // write the statement data to the fields - final Geometry geom = GeoParseUtils.getGeometry(statement); - if(geom == null || geom.isEmpty() || !geom.isValid()) { - throw new ParseException("Could not create geometry for statement " + statement); - } - newFeature.setDefaultGeometry(geom); - - newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject); - newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate); - newFeature.setAttribute(OBJECT_ATTRIBUTE, object); - newFeature.setAttribute(CONTEXT_ATTRIBUTE, context); - - // preserve the ID that we created for this feature - // (set the hint to FALSE to have GeoTools generate IDs) - newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE); - - return newFeature; - } - - private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, - final StatementConstraints contraints) { - final List<String> filterParms = new ArrayList<String>(); - - filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); - - if (contraints.hasSubject()) { - filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); - } - if (contraints.hasContext()) { - filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); - } - if (contraints.hasPredicates()) { - final List<String> predicates = new ArrayList<String>(); - for (final URI u : contraints.getPredicates()) { - predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); - } - filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); - } - - final String filterString = StringUtils.join(filterParms, " AND "); - logger.info("Performing geomesa query : " + filterString); - - return getIteratorWrapper(filterString); - } - - private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) { - - return new CloseableIteration<Statement, QueryEvaluationException>() { - - private FeatureIterator<SimpleFeature> featureIterator = null; - - FeatureIterator<SimpleFeature> getIterator() throws QueryEvaluationException { - if (featureIterator == null) { - Filter cqlFilter; - try { - cqlFilter = ECQL.toFilter(filterString); - } catch (final CQLException e) { - logger.error("Error parsing query: " + filterString, e); - throw new QueryEvaluationException(e); - } - - final Query query = new Query(featureType.getTypeName(), cqlFilter); - try { - featureIterator = featureSource.getFeatures(query).features(); - } catch (final IOException e) { - logger.error("Error performing query: " + filterString, e); - throw new QueryEvaluationException(e); - } - } - return featureIterator; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - return getIterator().hasNext(); - } - - @Override - public Statement next() throws QueryEvaluationException { - final SimpleFeature feature = getIterator().next(); - final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); - final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); - final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); - final String contextString = feature.getAttribute(CONTEXT_ATTRIBUTE).toString(); - final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); - return statement; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - getIterator().close(); - } - }; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { - return performQuery("EQUALS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { - return performQuery("DISJOINT", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { - return performQuery("INTERSECTS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { - return performQuery("TOUCHES", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { - return performQuery("CROSSES", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { - return performQuery("WITHIN", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { - return performQuery("CONTAINS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { - return performQuery("OVERLAPS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, - final StatementConstraints contraints) { - throw new UnsupportedOperationException("Near queries are not supported in Accumulo."); - } - - @Override - public Set<URI> getIndexablePredicates() { - return validPredicates; - } - - @Override - public void flush() throws IOException { - // TODO cache and flush features instead of writing them one at a time - } - - @Override - public void close() throws IOException { - flush(); - } - - - @Override - public String getTableName() { - return getTableName(conf); - } - - /** - * Get the Accumulo table that will be used by this index. - * @param conf - * @return table name guaranteed to be used by instances of this index - */ - public static String getTableName(final Configuration conf) { - return makeTableName( ConfigUtils.getTablePrefix(conf) ); - } - - /** - * Make the Accumulo table name used by this indexer for a specific instance of Rya. - * - * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null) - * @return The Accumulo table name used by this indexer for a specific instance of Rya. - */ - public static String makeTableName(final String ryaInstanceName) { - requireNonNull(ryaInstanceName); - return ryaInstanceName + TABLE_SUFFIX; - } - - private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { - // create a feature collection - final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - - for (final RyaStatement ryaStatement : ryaStatements) { - final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); - // if the predicate list is empty, accept all predicates. - // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - try { - final SimpleFeature feature = createFeature(featureType, statement); - featureCollection.add(feature); - } catch (final ParseException e) { - logger.warn("Error getting geo from statement: " + statement.toString(), e); - } - } - } - - // remove this feature collection from the store - if (!featureCollection.isEmpty()) { - final Set<Identifier> featureIds = new HashSet<Identifier>(); - final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); - final Set<String> stringIds = DataUtilities.fidSet(featureCollection); - for (final String id : stringIds) { - featureIds.add(filterFactory.featureId(id)); - } - final Filter filter = filterFactory.id(featureIds); - featureStore.removeFeatures(filter); - } - } - - - @Override - public void deleteStatement(final RyaStatement statement) throws IOException { - deleteStatements(Collections.singleton(statement)); - } - - @Override - public void init() { - } - - @Override - public void setConnector(final Connector connector) { - } - - @Override - public void destroy() { - } - - @Override - public void purge(final RdfCloudTripleStoreConfiguration configuration) { - } - - @Override - public void dropAndDestroy() { - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java deleted file mode 100644 index 103b241..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java +++ /dev/null @@ -1,148 +0,0 @@ -package org.apache.rya.indexing.accumulo.geo; - -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; - -import javax.xml.parsers.ParserConfigurationException; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.GeoConstants; -import org.geotools.gml3.GMLConfiguration; -import org.geotools.xml.Parser; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.Value; -import org.openrdf.query.algebra.FunctionCall; -import org.openrdf.query.algebra.ValueConstant; -import org.openrdf.query.algebra.ValueExpr; -import org.openrdf.query.algebra.Var; -import org.xml.sax.SAXException; - -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; -import com.vividsolutions.jts.io.WKTReader; - -public class GeoParseUtils { - static final Logger logger = Logger.getLogger(GeoParseUtils.class); - /** - * @deprecated Not needed since geo literals may be WKT or GML. - * - * This method warns on a condition that must already be tested. Replaced by - * {@link #getLiteral(Statement)} and {@link #getGeometry(Statement} - * and getLiteral(statement).toString() - * and getLiteral(statement).getDatatype() - */ - @Deprecated - public static String getWellKnownText(final Statement statement) throws ParseException { - final Literal lit = getLiteral(statement); - if (!GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { - logger.warn("Literal is not of type " + GeoConstants.XMLSCHEMA_OGC_WKT + ": " + statement.toString()); - } - return lit.getLabel().toString(); - } - - public static Literal getLiteral(final Statement statement) throws ParseException { - final org.openrdf.model.Value v = statement.getObject(); - if (!(v instanceof Literal)) { - throw new ParseException("Statement does not contain Literal: " + statement.toString()); - } - final Literal lit = (Literal) v; - return lit; - } - - /** - * Parse GML/wkt literal to Geometry - * - * @param statement - * @return - * @throws ParseException - * @throws ParserConfigurationException - * @throws SAXException - * @throws IOException - */ - public static Geometry getGeometry(final Statement statement) throws ParseException { - // handle GML or WKT - final Literal lit = getLiteral(statement); - if (GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { - final String wkt = lit.getLabel().toString(); - return (new WKTReader()).read(wkt); - } else if (GeoConstants.XMLSCHEMA_OGC_GML.equals(lit.getDatatype())) { - final String gml = lit.getLabel().toString(); - try { - return getGeometryGml(gml); - } catch (IOException | SAXException | ParserConfigurationException e) { - throw new ParseException(e); - } - } else { - throw new ParseException("Literal is unknown geo type, expecting WKT or GML: " + statement.toString()); - } - } - /** - * Convert GML/XML string into a geometry that can be indexed. - * @param gmlString - * @return - * @throws IOException - * @throws SAXException - * @throws ParserConfigurationException - */ - public static Geometry getGeometryGml(final String gmlString) throws IOException, SAXException, ParserConfigurationException { - final Reader reader = new StringReader(gmlString); - final GMLConfiguration gmlConfiguration = new GMLConfiguration(); - final Parser gmlParser = new Parser(gmlConfiguration); - // gmlParser.setStrict(false); // attempt at allowing deprecated elements, but no. - // gmlParser.setValidating(false); - final Geometry geometry = (Geometry) gmlParser.parse(reader); - // This sometimes gets populated with the SRS/CRS: geometry.getUserData() - // Always returns 0 : geometry.getSRID() - //TODO geometry.setUserData(some default CRS); OR geometry.setSRID(some default CRS) - - return geometry; - } - - /** - * Extracts the arguments used in a {@link FunctionCall}. - * @param matchName - The variable name to match to arguments used in the {@link FunctionCall}. - * @param call - The {@link FunctionCall} to match against. - * @return - The {@link Value}s matched. - */ - public static Object[] extractArguments(final String matchName, final FunctionCall call) { - final Object[] args = new Object[call.getArgs().size() - 1]; - int argI = 0; - for (int i = 0; i != call.getArgs().size(); ++i) { - final ValueExpr arg = call.getArgs().get(i); - if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { - continue; - } - if (arg instanceof ValueConstant) { - args[argI] = ((ValueConstant)arg).getValue(); - } else if (arg instanceof Var && ((Var)arg).hasValue()) { - args[argI] = ((Var)arg).getValue(); - } else { - args[argI] = arg; - } - ++argI; - } - return args; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java deleted file mode 100644 index 8cdeb5c..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java +++ /dev/null @@ -1,499 +0,0 @@ -package org.apache.rya.indexing.accumulo.geo; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import org.apache.commons.lang3.math.NumberUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.GeoConstants; -import org.apache.rya.indexing.GeoIndexer; -import org.apache.rya.indexing.IndexingExpr; -import org.apache.rya.indexing.IteratorFactory; -import org.apache.rya.indexing.SearchFunction; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.Var; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; -import com.vividsolutions.jts.io.WKTReader; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; -import joptsimple.internal.Strings; - -//Indexing Node for geo expressions to be inserted into execution plan -//to delegate geo portion of query to geo index -public class GeoTupleSet extends ExternalTupleSet { - private static final String NEAR_DELIM = "::"; - private final Configuration conf; - private final GeoIndexer geoIndexer; - private final IndexingExpr filterInfo; - - - public GeoTupleSet(final IndexingExpr filterInfo, final GeoIndexer geoIndexer) { - this.filterInfo = filterInfo; - this.geoIndexer = geoIndexer; - conf = geoIndexer.getConf(); - } - - @Override - public Set<String> getBindingNames() { - return filterInfo.getBindingNames(); - } - - @Override - public GeoTupleSet clone() { - return new GeoTupleSet(filterInfo, geoIndexer); - } - - @Override - public double cardinality() { - return 0.0; // No idea how the estimate cardinality here. - } - - - @Override - public String getSignature() { - return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " "); - } - - - - @Override - public boolean equals(final Object other) { - if (other == this) { - return true; - } - if (!(other instanceof GeoTupleSet)) { - return false; - } - final GeoTupleSet arg = (GeoTupleSet) other; - return filterInfo.equals(arg.filterInfo); - } - - @Override - public int hashCode() { - int result = 17; - result = 31*result + filterInfo.hashCode(); - - return result; - } - - - - /** - * Returns an iterator over the result set of the contained IndexingExpr. - * <p> - * Should be thread-safe (concurrent invocation {@link OfflineIterable} this - * method can be expected with some query evaluators. - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) - throws QueryEvaluationException { - - final URI funcURI = filterInfo.getFunction(); - final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf, geoIndexer).getSearchFunction(funcURI); - - String queryText; - Object arg = filterInfo.getArguments()[0]; - if (arg instanceof Value) { - queryText = ((Value) arg).stringValue(); - } else if (arg instanceof Var) { - queryText = bindings.getBinding(((Var) arg).getName()).getValue().stringValue(); - } else { - throw new IllegalArgumentException("Query text was not resolved"); - } - - if(funcURI.equals(GeoConstants.GEO_SF_NEAR)) { - if (filterInfo.getArguments().length > 3) { - throw new IllegalArgumentException("Near functions do not support more than four arguments."); - } - - final List<String> valueList = new ArrayList<>(); - for (final Object val : filterInfo.getArguments()) { - if (val instanceof Value) { - valueList.add(((Value)val).stringValue()); - } else if (val instanceof Var) { - valueList.add(bindings.getBinding(((Var) val).getName()).getValue().stringValue()); - } else { - throw new IllegalArgumentException("Query text was not resolved"); - } - } - queryText = Strings.join(valueList, NEAR_DELIM); - } else if (filterInfo.getArguments().length > 1) { - throw new IllegalArgumentException("Index functions do not support more than two arguments."); - } - - try { - final CloseableIteration<BindingSet, QueryEvaluationException> iterrez = IteratorFactory - .getIterator(filterInfo.getSpConstraint(), bindings, - queryText, searchFunction); - return iterrez; - } catch (final Exception e) { - System.out.println(e.getMessage()); - throw e; - } - } - - //returns appropriate search function for a given URI - //search functions used in GeoMesaGeoIndexer to access index - public static class GeoSearchFunctionFactory { - - Configuration conf; - - private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - - private final GeoIndexer geoIndexer; - - public GeoSearchFunctionFactory(final Configuration conf, final GeoIndexer geoIndexer) { - this.conf = conf; - this.geoIndexer = geoIndexer; - } - - - /** - * Get a {@link GeoSearchFunction} for a given URI. - * - * @param searchFunction - * @return - */ - public SearchFunction getSearchFunction(final URI searchFunction) { - - SearchFunction geoFunc = null; - - try { - geoFunc = getSearchFunctionInternal(searchFunction); - } catch (final QueryEvaluationException e) { - e.printStackTrace(); - } - - return geoFunc; - } - - private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); - - if (sf != null) { - return sf; - } else { - throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); - } - } - - private final SearchFunction GEO_EQUALS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryEquals( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_EQUALS"; - }; - }; - - private final SearchFunction GEO_DISJOINT = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryDisjoint( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_DISJOINT"; - }; - }; - - private final SearchFunction GEO_INTERSECTS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryIntersects( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_INTERSECTS"; - }; - }; - - private final SearchFunction GEO_TOUCHES = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryTouches( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_TOUCHES"; - }; - }; - - private final SearchFunction GEO_CONTAINS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryContains( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_CONTAINS"; - }; - }; - - private final SearchFunction GEO_OVERLAPS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryOverlaps( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_OVERLAPS"; - }; - }; - - private final SearchFunction GEO_CROSSES = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryCrosses( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_CROSSES"; - }; - }; - - private final SearchFunction GEO_WITHIN = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(queryText); - final CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_WITHIN"; - }; - }; - - private final SearchFunction GEO_NEAR = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String queryText, - final StatementConstraints contraints) throws QueryEvaluationException { - try { - final String[] args = queryText.split(NEAR_DELIM); - Optional<Double> maxDistanceOpt = Optional.empty(); - Optional<Double> minDistanceOpt = Optional.empty(); - final String query = args[0]; - - for (int ii = 1; ii < args.length; ii++) { - String numArg = args[ii]; - - // remove pre-padding 0's since NumberUtils.isNumber() - // will assume its octal if it starts with a 0. - while (numArg.startsWith("0")) { - numArg = numArg.substring(1); - } - // was 0 - if (numArg.equals("")) { - // if max hasn't been set, set it to 0. - // Otherwise, min is just ignored. - if (!maxDistanceOpt.isPresent()) { - maxDistanceOpt = Optional.of(0.0); - } - } else { - if (!maxDistanceOpt.isPresent() && NumberUtils.isNumber(numArg)) { - // no variable identifier, going by order. - maxDistanceOpt = getDistanceOpt(numArg, "maxDistance"); - } else if (NumberUtils.isNumber(numArg)) { - // no variable identifier, going by order. - minDistanceOpt = getDistanceOpt(numArg, "minDistance"); - } else { - throw new IllegalArgumentException(numArg + " is not a valid Near function argument."); - } - } - } - final WKTReader reader = new WKTReader(); - final Geometry geometry = reader.read(query); - final NearQuery nearQuery = new NearQuery(maxDistanceOpt, minDistanceOpt, geometry); - return geoIndexer.queryNear(nearQuery, contraints); - } catch (final ParseException e) { - throw new QueryEvaluationException(e); - } - } - - private Optional<Double> getDistanceOpt(final String num, final String name) { - try { - double dist = Double.parseDouble(num); - if(dist < 0) { - throw new IllegalArgumentException("Value for: " + name + " must be non-negative."); - } - return Optional.of(Double.parseDouble(num)); - } catch (final NumberFormatException nfe) { - throw new IllegalArgumentException("Value for: " + name + " must be a number."); - } - } - - @Override - public String toString() { - return "GEO_NEAR"; - } - }; - - /** - * - */ - public class NearQuery { - private final Optional<Double> maxDistanceOpt; - private final Optional<Double> minDistanceOpt; - private final Geometry geo; - - /** - * - * @param maxDistance - * @param minDistance - * @param geo - */ - public NearQuery(final Optional<Double> maxDistance, final Optional<Double> minDistance, - final Geometry geo) { - maxDistanceOpt = maxDistance; - minDistanceOpt = minDistance; - this.geo = geo; - } - - public Optional<Double> getMaxDistance() { - return maxDistanceOpt; - } - - public Optional<Double> getMinDistance() { - return minDistanceOpt; - } - - public Geometry getGeometry() { - return geo; - } - } - - { - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_NEAR, GEO_NEAR); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java deleted file mode 100644 index 45a23f9..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java +++ /dev/null @@ -1,668 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.accumulo.geo; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.GeoIndexer; -import org.apache.rya.indexing.Md5Hash; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.StatementSerializer; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; -import org.geotools.data.DataStore; -import org.geotools.data.DataUtilities; -import org.geotools.data.FeatureSource; -import org.geotools.data.FeatureStore; -import org.geotools.factory.CommonFactoryFinder; -import org.geotools.factory.Hints; -import org.geotools.feature.DefaultFeatureCollection; -import org.geotools.feature.SchemaException; -import org.geotools.feature.simple.SimpleFeatureBuilder; -import org.geotools.filter.text.cql2.CQLException; -import org.geotools.filter.text.ecql.ECQL; -import org.opengis.feature.simple.SimpleFeature; -import org.opengis.feature.simple.SimpleFeatureType; -import org.opengis.filter.Filter; -import org.opengis.filter.FilterFactory; -import org.opengis.filter.identity.Identifier; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; - -import info.aduna.iteration.CloseableIteration; -import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; -import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStore; -import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStoreFactory; -import mil.nga.giat.geowave.adapter.vector.plugin.GeoWavePluginException; -import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery; -import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider; -import mil.nga.giat.geowave.core.store.CloseableIterator; -import mil.nga.giat.geowave.core.store.StoreFactoryFamilySpi; -import mil.nga.giat.geowave.core.store.index.PrimaryIndex; -import mil.nga.giat.geowave.core.store.memory.MemoryStoreFactoryFamily; -import mil.nga.giat.geowave.core.store.query.EverythingQuery; -import mil.nga.giat.geowave.core.store.query.QueryOptions; -import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore; -import mil.nga.giat.geowave.datastore.accumulo.AccumuloStoreFactoryFamily; - -/** - * A {@link GeoIndexer} wrapper around a GeoWave {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the - * RDF Feature Type, and interacts with the Datastore. - * <p> - * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature - * contains the standard set of GeoWave attributes (Geometry, Start Date, and End Date). The GeoWaveGeoIndexer populates the Geometry - * attribute by parsing the Well-Known Text contained in the RDF Statementâs object literal value. - * <p> - * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: - * <p> - * <table border="1"> - * <tr> - * <th>Name</th> - * <th>Symbol</th> - * <th>Type</th> - * </tr> - * <tr> - * <td>Subject Attribute</td> - * <td>S</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Predicate Attribute</td> - * <td>P</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Object Attribute</td> - * <td>O</td> - * <td>String</td> - * </tr> - * </tr> - * <tr> - * <td>Context Attribute</td> - * <td>C</td> - * <td>String</td> - * </tr> - * </table> - */ -public class GeoWaveGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { - - private static final String TABLE_SUFFIX = "geo"; - - private static final Logger logger = Logger.getLogger(GeoWaveGeoIndexer.class); - - private static final String FEATURE_NAME = "RDF"; - - private static final String SUBJECT_ATTRIBUTE = "S"; - private static final String PREDICATE_ATTRIBUTE = "P"; - private static final String OBJECT_ATTRIBUTE = "O"; - private static final String CONTEXT_ATTRIBUTE = "C"; - private static final String GEO_ID_ATTRIBUTE = "geo_id"; - private static final String GEOMETRY_ATTRIBUTE = "geowave_index_geometry"; - - private Set<URI> validPredicates; - private Configuration conf; - private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; - private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; - private SimpleFeatureType featureType; - private FeatureDataAdapter featureDataAdapter; - private DataStore geoToolsDataStore; - private mil.nga.giat.geowave.core.store.DataStore geoWaveDataStore; - private final PrimaryIndex index = new SpatialDimensionalityTypeProvider().createPrimaryIndex(); - private boolean isInit = false; - - //initialization occurs in setConf because index is created using reflection - @Override - public void setConf(final Configuration conf) { - this.conf = conf; - if (!isInit) { - try { - initInternal(); - isInit = true; - } catch (final IOException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } - } - - @Override - public Configuration getConf() { - return conf; - } - - /** - * @return the internal GeoTools{@link DataStore} used by the {@link GeoWaveGeoIndexer}. - */ - public DataStore getGeoToolsDataStore() { - return geoToolsDataStore; - } - - /** - * @return the internal GeoWave {@link DataStore} used by the {@link GeoWaveGeoIndexer}. - */ - public mil.nga.giat.geowave.core.store.DataStore getGeoWaveDataStore() { - return geoWaveDataStore; - } - - private void initInternal() throws IOException { - validPredicates = ConfigUtils.getGeoPredicates(conf); - - try { - geoToolsDataStore = createDataStore(conf); - geoWaveDataStore = ((GeoWaveGTDataStore) geoToolsDataStore).getDataStore(); - } catch (final GeoWavePluginException e) { - logger.error("Failed to create GeoWave data store", e); - } - - try { - featureType = getStatementFeatureType(geoToolsDataStore); - } catch (final IOException | SchemaException e) { - throw new IOException(e); - } - - featureDataAdapter = new FeatureDataAdapter(featureType); - - featureSource = geoToolsDataStore.getFeatureSource(featureType.getName()); - if (!(featureSource instanceof FeatureStore)) { - throw new IllegalStateException("Could not retrieve feature store"); - } - featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; - } - - public Map<String, Serializable> getParams(final Configuration conf) { - // get the configuration parameters - final Instance instance = ConfigUtils.getInstance(conf); - final String instanceId = instance.getInstanceName(); - final String zookeepers = instance.getZooKeepers(); - final String user = ConfigUtils.getUsername(conf); - final String password = ConfigUtils.getPassword(conf); - final String auths = ConfigUtils.getAuthorizations(conf).toString(); - final String tableName = getTableName(conf); - final String tablePrefix = ConfigUtils.getTablePrefix(conf); - - final Map<String, Serializable> params = new HashMap<>(); - params.put("zookeeper", zookeepers); - params.put("instance", instanceId); - params.put("user", user); - params.put("password", password); - params.put("namespace", tableName); - params.put("gwNamespace", tablePrefix + getClass().getSimpleName()); - - params.put("Lock Management", LockManagementType.MEMORY.toString()); - params.put("Authorization Management Provider", AuthorizationManagementProviderType.EMPTY.toString()); - params.put("Authorization Data URL", null); - params.put("Transaction Buffer Size", 10000); - params.put("Query Index Strategy", QueryIndexStrategyType.HEURISTIC_MATCH.toString()); - return params; - } - - /** - * Creates the {@link DataStore} for the {@link GeoWaveGeoIndexer}. - * @param conf the {@link Configuration}. - * @return the {@link DataStore}. - */ - public DataStore createDataStore(final Configuration conf) throws IOException, GeoWavePluginException { - final Map<String, Serializable> params = getParams(conf); - final Instance instance = ConfigUtils.getInstance(conf); - final boolean useMock = instance instanceof MockInstance; - - final StoreFactoryFamilySpi storeFactoryFamily; - if (useMock) { - storeFactoryFamily = new MemoryStoreFactoryFamily(); - } else { - storeFactoryFamily = new AccumuloStoreFactoryFamily(); - } - - final GeoWaveGTDataStoreFactory geoWaveGTDataStoreFactory = new GeoWaveGTDataStoreFactory(storeFactoryFamily); - final DataStore dataStore = geoWaveGTDataStoreFactory.createNewDataStore(params); - - return dataStore; - } - - private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException { - SimpleFeatureType featureType; - - final String[] datastoreFeatures = dataStore.getTypeNames(); - if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) { - featureType = dataStore.getSchema(FEATURE_NAME); - } else { - featureType = DataUtilities.createType(FEATURE_NAME, - SUBJECT_ATTRIBUTE + ":String," + - PREDICATE_ATTRIBUTE + ":String," + - OBJECT_ATTRIBUTE + ":String," + - CONTEXT_ATTRIBUTE + ":String," + - GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326," + - GEO_ID_ATTRIBUTE + ":String"); - - dataStore.createSchema(featureType); - } - return featureType; - } - - @Override - public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { - // create a feature collection - final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - for (final RyaStatement ryaStatement : ryaStatements) { - final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); - // if the predicate list is empty, accept all predicates. - // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - try { - final SimpleFeature feature = createFeature(featureType, statement); - featureCollection.add(feature); - } catch (final ParseException e) { - logger.warn("Error getting geo from statement: " + statement.toString(), e); - } - } - } - - // write this feature collection to the store - if (!featureCollection.isEmpty()) { - featureStore.addFeatures(featureCollection); - } - } - - @Override - public void storeStatement(final RyaStatement statement) throws IOException { - storeStatements(Collections.singleton(statement)); - } - - private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException { - final String subject = StatementSerializer.writeSubject(statement); - final String predicate = StatementSerializer.writePredicate(statement); - final String object = StatementSerializer.writeObject(statement); - final String context = StatementSerializer.writeContext(statement); - - // create the feature - final Object[] noValues = {}; - - // create the hash - final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement)); - final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId); - - // write the statement data to the fields - final Geometry geom = GeoParseUtils.getGeometry(statement); - if(geom == null || geom.isEmpty() || !geom.isValid()) { - throw new ParseException("Could not create geometry for statement " + statement); - } - newFeature.setDefaultGeometry(geom); - - newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject); - newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate); - newFeature.setAttribute(OBJECT_ATTRIBUTE, object); - newFeature.setAttribute(CONTEXT_ATTRIBUTE, context); - // GeoWave does not support querying based on a user generated feature ID - // So, we create a separate ID attribute that it can query on. - newFeature.setAttribute(GEO_ID_ATTRIBUTE, statementId); - - // preserve the ID that we created for this feature - // (set the hint to FALSE to have GeoTools generate IDs) - newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE); - - return newFeature; - } - - private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry, - final StatementConstraints contraints) { - final List<String> filterParms = new ArrayList<String>(); - - filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )"); - - if (contraints.hasSubject()) { - filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') "); - } - if (contraints.hasContext()) { - filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') "); - } - if (contraints.hasPredicates()) { - final List<String> predicates = new ArrayList<String>(); - for (final URI u : contraints.getPredicates()) { - predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') "); - } - filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")"); - } - - final String filterString = StringUtils.join(filterParms, " AND "); - logger.info("Performing geowave query : " + filterString); - - return getIteratorWrapper(filterString); - } - - private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) { - - return new CloseableIteration<Statement, QueryEvaluationException>() { - - private CloseableIterator<SimpleFeature> featureIterator = null; - - CloseableIterator<SimpleFeature> getIterator() throws QueryEvaluationException { - if (featureIterator == null) { - Filter cqlFilter; - try { - cqlFilter = ECQL.toFilter(filterString); - } catch (final CQLException e) { - logger.error("Error parsing query: " + filterString, e); - throw new QueryEvaluationException(e); - } - - final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, featureDataAdapter); - final QueryOptions queryOptions = new QueryOptions(featureDataAdapter, index); - - try { - featureIterator = geoWaveDataStore.query(queryOptions, cqlQuery); - } catch (final Exception e) { - logger.error("Error performing query: " + filterString, e); - throw new QueryEvaluationException(e); - } - } - return featureIterator; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - return getIterator().hasNext(); - } - - @Override - public Statement next() throws QueryEvaluationException { - final SimpleFeature feature = getIterator().next(); - final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString(); - final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString(); - final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString(); - final Object context = feature.getAttribute(CONTEXT_ATTRIBUTE); - final String contextString = context != null ? context.toString() : ""; - final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString); - return statement; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - try { - getIterator().close(); - } catch (final IOException e) { - throw new QueryEvaluationException(e); - } - } - }; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) { - return performQuery("EQUALS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) { - return performQuery("DISJOINT", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) { - return performQuery("INTERSECTS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) { - return performQuery("TOUCHES", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) { - return performQuery("CROSSES", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) { - return performQuery("WITHIN", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) { - return performQuery("CONTAINS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) { - return performQuery("OVERLAPS", query, contraints); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, - final StatementConstraints contraints) { - throw new UnsupportedOperationException("Near queries are not supported in Accumulo."); - } - - @Override - public Set<URI> getIndexablePredicates() { - return validPredicates; - } - - @Override - public void flush() throws IOException { - // TODO cache and flush features instead of writing them one at a time - } - - @Override - public void close() throws IOException { - flush(); - } - - - @Override - public String getTableName() { - return getTableName(conf); - } - - /** - * Get the Accumulo table that will be used by this index. - * @param conf - * @return table name guaranteed to be used by instances of this index - */ - public static String getTableName(final Configuration conf) { - return makeTableName( ConfigUtils.getTablePrefix(conf) ); - } - - /** - * Make the Accumulo table name used by this indexer for a specific instance of Rya. - * - * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null) - * @return The Accumulo table name used by this indexer for a specific instance of Rya. - */ - public static String makeTableName(final String ryaInstanceName) { - requireNonNull(ryaInstanceName); - return ryaInstanceName + TABLE_SUFFIX; - } - - private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException { - // create a feature collection - final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection(); - - for (final RyaStatement ryaStatement : ryaStatements) { - final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); - // if the predicate list is empty, accept all predicates. - // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - try { - final SimpleFeature feature = createFeature(featureType, statement); - featureCollection.add(feature); - } catch (final ParseException e) { - logger.warn("Error getting geo from statement: " + statement.toString(), e); - } - } - } - - // remove this feature collection from the store - if (!featureCollection.isEmpty()) { - final Set<Identifier> featureIds = new HashSet<Identifier>(); - final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null); - final Set<String> stringIds = DataUtilities.fidSet(featureCollection); - for (final String id : stringIds) { - featureIds.add(filterFactory.featureId(id)); - } - - final String filterString = stringIds.stream().collect(Collectors.joining("','", "'", "'")); - - Filter filter = null; - try { - filter = ECQL.toFilter(GEO_ID_ATTRIBUTE + " IN (" + filterString + ")", filterFactory); - } catch (final CQLException e) { - logger.error("Unable to generate filter for deleting the statement.", e); - } - - featureStore.removeFeatures(filter); - } - } - - - @Override - public void deleteStatement(final RyaStatement statement) throws IOException { - deleteStatements(Collections.singleton(statement)); - } - - @Override - public void init() { - } - - @Override - public void setConnector(final Connector connector) { - } - - @Override - public void destroy() { - } - - @Override - public void purge(final RdfCloudTripleStoreConfiguration configuration) { - // delete existing data - geoWaveDataStore.delete(new QueryOptions(), new EverythingQuery()); - } - - @Override - public void dropAndDestroy() { - } - - /** - * The list of supported Geo Wave {@code LockingManagementFactory} types. - */ - private static enum LockManagementType { - MEMORY("memory"); - - private final String name; - - /** - * Creates a new {@link LockManagementType}. - * @param name the name of the type. (not {@code null}) - */ - private LockManagementType(final String name) { - this.name = checkNotNull(name); - } - - @Override - public String toString() { - return name; - } - } - - /** - * The list of supported Geo Wave {@code AuthorizationFactorySPI } types. - */ - private static enum AuthorizationManagementProviderType { - EMPTY("empty"), - JSON_FILE("jsonFile"); - - private final String name; - - /** - * Creates a new {@link AuthorizationManagementProviderType}. - * @param name the name of the type. (not {@code null}) - */ - private AuthorizationManagementProviderType(final String name) { - this.name = checkNotNull(name); - } - - @Override - public String toString() { - return name; - } - } - - /** - * The list of supported Geo Wave {@code IndexQueryStrategySPI} types. - */ - private static enum QueryIndexStrategyType { - BEST_MATCH("Best Match"), - HEURISTIC_MATCH("Heuristic Match"), - PRESERVE_LOCALITY("Preserve Locality"); - - private final String name; - - /** - * Creates a new {@link QueryIndexStrategyType}. - * @param name the name of the type. (not {@code null}) - */ - private QueryIndexStrategyType(final String name) { - this.name = checkNotNull(name); - } - - @Override - public String toString() { - return name; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java deleted file mode 100644 index c4a287e..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.geotemporal; - -import org.apache.rya.indexing.external.matching.AbstractExternalSetMatcherFactory; -import org.apache.rya.indexing.external.matching.ExternalSetMatcher; -import org.apache.rya.indexing.external.matching.JoinSegment; -import org.apache.rya.indexing.external.matching.JoinSegmentMatcher; -import org.apache.rya.indexing.external.matching.OptionalJoinSegment; -import org.apache.rya.indexing.external.matching.OptionalJoinSegmentMatcher; -import org.apache.rya.indexing.geotemporal.model.EventQueryNode; - -/** - * Factory used to build {@link EntityQueryNodeMatcher}s for the {@link GeoTemporalIndexOptimizer}. - * - */ -public class GeoTemporalExternalSetMatcherFactory extends AbstractExternalSetMatcherFactory<EventQueryNode> { - - @Override - protected ExternalSetMatcher<EventQueryNode> getJoinSegmentMatcher(final JoinSegment<EventQueryNode> segment) { - return new JoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter()); - } - - @Override - protected ExternalSetMatcher<EventQueryNode> getOptionalJoinSegmentMatcher(final OptionalJoinSegment<EventQueryNode> segment) { - return new OptionalJoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java deleted file mode 100644 index b2d4de5..0000000 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.geotemporal; - -import org.apache.rya.indexing.entity.model.TypedEntity; - -/** - * An operation over the {@link TypedEntity} index failed to complete. - */ -public class GeoTemporalIndexException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Constructs a new exception with the specified detail message. The - * cause is not initialized, and may subsequently be initialized by - * a call to {@link #initCause}. - * - * @param message the detail message. The detail message is saved for - * later retrieval by the {@link #getMessage()} method. - */ - public GeoTemporalIndexException(final String message) { - super(message); - } - - /** - * Constructs a new exception with the specified detail message and - * cause. <p>Note that the detail message associated with - * {@code cause} is <i>not</i> automatically incorporated in - * this exception's detail message. - * - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A <tt>null</tt> value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - */ - public GeoTemporalIndexException(final String message, final Throwable cause) { - super(message, cause); - } -} \ No newline at end of file
