RYA-377 Geo Filter support Added geo filter support to rya streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/9cd0c568 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/9cd0c568 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/9cd0c568 Branch: refs/heads/master Commit: 9cd0c568d14f104a1604c2b5425a7fe9d154fc24 Parents: 9442322 Author: Andrew Smith <[email protected]> Authored: Tue Nov 28 13:12:57 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../rya.pcj.fluo/rya.pcj.functions.geo/pom.xml | 34 +---- extras/rya.streams/kafka/pom.xml | 22 ++- .../kafka/processors/filter/GeoFilterIT.java | 139 +++++++++++++++++++ 3 files changed, 162 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml index b9b2143..373d869 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml @@ -40,35 +40,15 @@ under the License. <artifactId>rya.api</artifactId> </dependency> <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.api</artifactId> - </dependency> - <dependency> <groupId>com.vividsolutions</groupId> <artifactId>jts</artifactId> <version>1.13</version> </dependency> <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.test.base</artifactId> - </dependency> - <dependency> <groupId>org.eclipse.rdf4j</groupId> <artifactId>rdf4j-queryalgebra-geosparql</artifactId> <version>2.2</version> </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-api</artifactId> - </dependency> <dependency> <groupId>org.eclipse.rdf4j</groupId> <artifactId>rdf4j-queryalgebra-evaluation</artifactId> @@ -76,18 +56,8 @@ under the License. </dependency> <!-- Testing dependencies. --> <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-mini</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-test</artifactId> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.test.base</artifactId> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 0ccbb6e..2d33f32 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -33,6 +33,26 @@ under the License. This module contains the Rya Streams components that integrate with Kafka. </description> + <profiles> + <profile> + <id>geoindexing</id> + <dependencies> + <!-- Rya dependencies --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.functions.geo</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.geo.common</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + + <dependencies> <!-- Rya dependencies --> <dependency> @@ -95,4 +115,4 @@ under the License. <scope>test</scope> </dependency> </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java new file mode 100644 index 0000000..4057e18 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.function.Function; +import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; +import org.openrdf.query.impl.MapBindingSet; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.io.WKTWriter; + +/** + * Integration tests the methods of {@link FilterProcessor}. + */ +public class GeoFilterIT { + private static final String GEO = "http://www.opengis.net/def/function/geosparql/"; + private static final GeometryFactory GF = new GeometryFactory(); + private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0)); + private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1)); + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showGeoFunctionsRegistered() { + int count = 0; + final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); + for (final Function fun : funcs) { + if (fun.getURI().startsWith(GEO)) { + count++; + } + } + + // There are 30 geo functions registered, ensure that there are 30. + assertEquals(30, count); + } + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time#> \n" + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n" + + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n" + + "PREFIX geof: <" + GEO + ">\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:event1> geo:asWKT ?point .\n" + + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) " + + "}"; + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + final WKTWriter w = new WKTWriter(); + bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + private List<VisibilityStatement> getStatements() throws Exception { + final List<VisibilityStatement> statements = new ArrayList<>(); + // geo 2x2 points + statements.add(new VisibilityStatement(statement(ZERO), "a")); + statements.add(new VisibilityStatement(statement(ONE), "a")); + return statements; + } + + private static Statement statement(final Geometry geo) { + final ValueFactory vf = new ValueFactoryImpl(); + final Resource subject = vf.createURI("urn:event1"); + final URI predicate = GeoConstants.GEO_AS_WKT; + final WKTWriter w = new WKTWriter(); + final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT); + return new StatementImpl(subject, predicate, object); + } +} \ No newline at end of file
