RYA-377 Geo Filter support

Added geo filter support to rya streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/9cd0c568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/9cd0c568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/9cd0c568

Branch: refs/heads/master
Commit: 9cd0c568d14f104a1604c2b5425a7fe9d154fc24
Parents: 9442322
Author: Andrew Smith <[email protected]>
Authored: Tue Nov 28 13:12:57 2017 -0500
Committer: caleb <[email protected]>
Committed: Tue Jan 9 15:13:01 2018 -0500

----------------------------------------------------------------------
 .../rya.pcj.fluo/rya.pcj.functions.geo/pom.xml  |  34 +----
 extras/rya.streams/kafka/pom.xml                |  22 ++-
 .../kafka/processors/filter/GeoFilterIT.java    | 139 +++++++++++++++++++
 3 files changed, 162 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
index b9b2143..373d869 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml
@@ -40,35 +40,15 @@ under the License.
             <artifactId>rya.api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.api</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.vividsolutions</groupId>
             <artifactId>jts</artifactId>
             <version>1.13</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.indexing</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.test.base</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.eclipse.rdf4j</groupId>
             <artifactId>rdf4j-queryalgebra-geosparql</artifactId>
             <version>2.2</version>
         </dependency>
-         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.eclipse.rdf4j</groupId>
             <artifactId>rdf4j-queryalgebra-evaluation</artifactId>
@@ -76,18 +56,8 @@ under the License.
         </dependency>
         <!-- Testing dependencies. -->
         <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-mini</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-             <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-recipes-test</artifactId>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.test.base</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 0ccbb6e..2d33f32 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -33,6 +33,26 @@ under the License.
         This module contains the Rya Streams components that integrate with 
Kafka.
     </description>
 
+    <profiles>
+        <profile>
+            <id>geoindexing</id>
+                <dependencies>
+                    <!-- Rya dependencies -->
+                    <dependency>
+                        <groupId>org.apache.rya</groupId>
+                        <artifactId>rya.pcj.functions.geo</artifactId>
+                        <version>3.2.12-incubating-SNAPSHOT</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.rya</groupId>
+                        <artifactId>rya.geo.common</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                </dependencies>
+        </profile>
+    </profiles>
+
+
     <dependencies>
         <!-- Rya dependencies -->
         <dependency>
@@ -95,4 +115,4 @@ under the License.
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9cd0c568/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
new file mode 100644
index 0000000..4057e18
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import 
org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.function.Function;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.io.WKTWriter;
+
+/**
+ * Integration tests the methods of {@link FilterProcessor}.
+ */
+public class GeoFilterIT {
+    private static final String GEO = 
"http://www.opengis.net/def/function/geosparql/";;
+    private static final GeometryFactory GF = new GeometryFactory();
+    private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0));
+    private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1));
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showGeoFunctionsRegistered() {
+        int count = 0;
+        final Collection<Function> funcs = 
FunctionRegistry.getInstance().getAll();
+        for (final Function fun : funcs) {
+            if (fun.getURI().startsWith(GEO)) {
+                count++;
+            }
+        }
+
+        // There are 30 geo functions registered, ensure that there are 30.
+        assertEquals(30, count);
+    }
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "PREFIX time: <http://www.w3.org/2006/time#> \n"
+                        + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+                        + "PREFIX geo: 
<http://www.opengis.net/ont/geosparql#>\n"
+                        + "PREFIX geof: <" + GEO + ">\n"
+                        + "SELECT * \n"
+                        + "WHERE { \n"
+                        + "  <urn:event1> geo:asWKT ?point .\n"
+                        + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 
2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+                        + "}";
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
+
+        // Create the statements that will be input into the query.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = getStatements();
+
+        // Make the expected results.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        final MapBindingSet bs = new MapBindingSet();
+        final WKTWriter w = new WKTWriter();
+        bs.addBinding("point", vf.createLiteral(w.write(ZERO), 
GeoConstants.XMLSCHEMA_OGC_WKT));
+        expected.add( new VisibilityBindingSet(bs, "a") );
+
+        // Run the test.
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, expected, 
VisibilityBindingSetDeserializer.class);
+    }
+
+    private List<VisibilityStatement> getStatements() throws Exception {
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        // geo 2x2 points
+        statements.add(new VisibilityStatement(statement(ZERO), "a"));
+        statements.add(new VisibilityStatement(statement(ONE), "a"));
+        return statements;
+    }
+
+    private static Statement statement(final Geometry geo) {
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Resource subject = vf.createURI("urn:event1");
+        final URI predicate = GeoConstants.GEO_AS_WKT;
+        final WKTWriter w = new WKTWriter();
+        final Value object = vf.createLiteral(w.write(geo), 
GeoConstants.XMLSCHEMA_OGC_WKT);
+        return new StatementImpl(subject, predicate, object);
+    }
+}
\ No newline at end of file

Reply via email to