NIFI-855 Adding support for filtering on multiple locations

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/706edeb0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/706edeb0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/706edeb0

Branch: refs/heads/master
Commit: 706edeb01fad32aca2dce280bde16f1926e2f76c
Parents: 3cca046
Author: Bryan Bende <[email protected]>
Authored: Wed Aug 19 20:56:42 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed Aug 19 20:57:57 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/twitter/GetTwitter.java     | 92 +++++++++----------
 .../nifi/processors/twitter/LocationUtil.java   | 93 ++++++++++++++++++++
 .../nifi/processors/twitter/TestGetTwitter.java | 84 ++++++++++++++++++
 .../processors/twitter/TestLocationUtil.java    | 74 ++++++++++++++++
 4 files changed, 294 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/706edeb0/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index d487492..a78b112 100644
--- 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -124,9 +124,11 @@ public class GetTwitter extends AbstractProcessor {
             .addValidator(new FollowingValidator())
             .build();
     public static final PropertyDescriptor LOCATIONS = new 
PropertyDescriptor.Builder()
-            .name("Location to Filter")
-            .description(" Bounding box for filtering tweets. Enter SW and NE 
corners in field (longitude, latitude, longitude, latitude). Example" +
-                    "-77.55661, 39.25831, -77.14325, 39.5374")
+            .name("Locations to Filter On")
+            .description("A comma-separated list of coordinates specifying one 
or more bounding boxes to filter on."
+                    + "Each bounding box is specified by a pair of coordinates 
in the format: swLon,swLat,neLon,neLat. "
+                    + "Multiple bounding boxes can be specified as such: 
swLon1,swLat1,neLon1,neLat1,swLon2,swLat2,neLon2,neLat2."
+                    + "Ignored unless Endpoint is set to 'Filter Endpoint'.")
             .addValidator(new LocationValidator() )
             .required(false)
             .build();
@@ -299,28 +301,15 @@ public class GetTwitter extends AbstractProcessor {
             }
 
             final String locationString = 
context.getProperty(LOCATIONS).getValue();
-            final String[] corSplit = locationString.split(",");
-
-            final List<Location> locationIds ;
-
-            double swLon = Double.parseDouble( corSplit[0] ) ;
-            double neLon = Double.parseDouble( corSplit[2] ) ;
-            double swLat = Double.parseDouble( corSplit[1] ) ;
-            double neLat = Double.parseDouble( corSplit[3] ) ;
-
-            Coordinate sw = new Coordinate( swLon, swLat ) ;
-            Coordinate ne = new Coordinate( neLon, neLat ) ;
-            Location bbox = new Location ( sw, ne ) ;
-
-            if ( locationString == null ) {
-                locationIds = Collections.emptyList();
+            final List<Location> locations;
+            if (locationString == null) {
+                locations = Collections.emptyList();
             } else {
-                locationIds = new ArrayList<>();
-                locationIds.add( bbox );
+                locations = LocationUtil.parseLocations(locationString);
             }
 
-            if ( !locationIds.isEmpty() ) {
-                filterEndpoint.locations(locationIds);
+            if (!locations.isEmpty()) {
+                filterEndpoint.locations(locations);
             }
 
             streamingEndpoint = filterEndpoint ;
@@ -402,42 +391,47 @@ public class GetTwitter extends AbstractProcessor {
 
     private static class LocationValidator implements Validator {
 
-        private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
-
         @Override
         public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
-            final String[] splits = input.split(",");
-            if ( splits.length != 4 ) {
-
-                return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must
 be comma-separated list of coordinates, SW and NE corners of your bounding 
box.").build();
-
-            } else {
-
-                try {
-                    // Validate longitude coordinates
-                    double swLon = Double.parseDouble(splits[0]);
-                    double neLon = Double.parseDouble(splits[2]);
-                    if (swLon < neLon) {
+            try {
+                final List<Location> locations = 
LocationUtil.parseLocations(input);
+                for (final Location location : locations) {
+                    final Coordinate sw = location.southwestCoordinate();
+                    final Coordinate ne = location.northeastCoordinate();
+
+                    if (sw.longitude() > ne.longitude()) {
+                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false)
+                                .explanation("SW Longitude (" + sw.longitude() 
+ ") must be less than NE Longitude ("
+                                        + ne.longitude() + ").").build();
+                    }
 
-                        // Validate latitude coordinates
-                        double swLat = Double.parseDouble(splits[1]);
-                        double neLat = Double.parseDouble(splits[3]);
-                        if (swLat < neLat) {
-                            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+                    if (sw.longitude() == ne.longitude()) {
+                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false)
+                                .explanation("SW Longitude (" + sw.longitude() 
+ ") can not be equal to NE Longitude ("
+                                        + ne.longitude() + ").").build();
+                    }
 
-                        } else {
-                            return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW
 Latitude must be less than NE Latitude.").build();
-                        }
+                    if (sw.latitude() > ne.latitude()) {
+                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false)
+                                .explanation("SW Latitude (" + sw.latitude() + 
") must be less than NE Latitude ("
+                                        + ne.latitude() + ").").build();
+                    }
 
-                    } else {
-                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW
 Longitude must be less than NE Longitude.").build();
+                    if (sw.latitude() == ne.latitude()) {
+                        return new 
ValidationResult.Builder().input(input).subject(subject).valid(false)
+                                .explanation("SW Latitude (" + sw.latitude() + 
") can not be equal to NE Latitude ("
+                                        + ne.latitude() + ").").build();
                     }
-                } catch ( Exception e ) {
-                    return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding
 box location parse failure.").build();
                 }
 
-            }
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
 
+            } catch (IllegalStateException e) {
+                return new ValidationResult.Builder()
+                        .input(input).subject(subject).valid(false)
+                        .explanation("Must be a comma-separated list of 
longitude,latitude pairs specifying one or more bounding boxes.")
+                        .build();
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/706edeb0/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java
new file mode 100644
index 0000000..63e881a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.twitter;
+
+import com.twitter.hbc.core.endpoint.Location;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utility for parsing locations to be used with the Twitter API:
+ *
+ * https://dev.twitter.com/streaming/overview/request-parameters#locations
+ *
+ */
+public class LocationUtil {
+
+    static final String LON = "[-+]?\\d{1,3}(?:[.]\\d+)?";
+    static final String LAT = "[-+]?\\d{1,2}(?:[.]\\d+)?";
+    static final String LON_LAT = LON + ",\\s*" + LAT;
+    static final String LOCATION = LON_LAT + ",\\s*" + LON_LAT;
+
+    /**
+     * Used to find locations one at a time after knowing the locations String 
is valid.
+     */
+    static final Pattern LOCATION_PATTERN = Pattern.compile(LOCATION);
+
+    /**
+     * One or more locations separated by a comma, exampple: 
lon,lat,lon,lat,lon,lat,lon,lat
+     */
+    static final Pattern LOCATIONS_PATTERN = Pattern.compile("(?:" + LOCATION 
+ ")(?:,\\s*" + LOCATION + ")*");
+
+
+    /**
+     *
+     * @param input a comma-separated list of longitude,latitude pairs 
specifying a set of bounding boxes,
+     *              with the southwest corner of the bounding box coming first
+     *
+     * @return a list of the Location instances represented by the provided 
input
+     */
+    public static List<Location> parseLocations(final String input) {
+        final List<Location> locations = new ArrayList<>();
+        final Matcher locationsMatcher = LOCATIONS_PATTERN.matcher(input);
+        if (locationsMatcher.matches()) {
+            Matcher locationMatcher = LOCATION_PATTERN.matcher(input);
+            while (locationMatcher.find()) {
+                final String location = locationMatcher.group();
+                locations.add(parseLocation(location));
+            }
+        } else {
+            throw new IllegalStateException("The provided location string was 
invalid.");
+        }
+
+        return locations;
+    }
+
+    /**
+     *
+     * @param location a comma-separated list of longitude,latitude pairs 
specifying one location
+     *
+     * @return the Location instance for the provided input
+     */
+    public static Location parseLocation(final String location) {
+        final String[] corSplit = location.split(",");
+
+        final double swLon = Double.parseDouble(corSplit[0]) ;
+        final double swLat = Double.parseDouble(corSplit[1]) ;
+
+        final double neLon = Double.parseDouble(corSplit[2]) ;
+        final double neLat = Double.parseDouble(corSplit[3]) ;
+
+        Location.Coordinate sw = new Location.Coordinate(swLon, swLat) ;
+        Location.Coordinate ne = new Location.Coordinate(neLon, neLat) ;
+        return new Location(sw, ne) ;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/706edeb0/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java
new file mode 100644
index 0000000..132fa17
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.twitter;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestGetTwitter {
+
+    @Test
+    public void testLocationValidatorWithValidLocations() {
+        final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
+        runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
+        runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
+        runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, 
"accessTokenSecret");
+        runner.setProperty(GetTwitter.LOCATIONS, 
"-122.75,36.8,-121.75,37.8,-74,40,-73,41");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testLocationValidatorWithEqualLatitudes() {
+        final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
+        runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
+        runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
+        runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, 
"accessTokenSecret");
+        runner.setProperty(GetTwitter.LOCATIONS, 
"-122.75,36.8,-121.75,37.8,-74,40,-73,40");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testLocationValidatorWithEqualLongitudes() {
+        final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
+        runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
+        runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
+        runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, 
"accessTokenSecret");
+        runner.setProperty(GetTwitter.LOCATIONS, 
"-122.75,36.8,-121.75,37.8,-74,40,-74,41");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testLocationValidatorWithSWLatGreaterThanNELat() {
+        final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
+        runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
+        runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
+        runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, 
"accessTokenSecret");
+        runner.setProperty(GetTwitter.LOCATIONS, 
"-122.75,36.8,-121.75,37.8,-74,40,-73,39");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testLocationValidatorWithSWLonGreaterThanNELon() {
+        final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
+        runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
+        runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
+        runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
+        runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, 
"accessTokenSecret");
+        runner.setProperty(GetTwitter.LOCATIONS, 
"-122.75,36.8,-121.75,37.8,-74,40,-75,41");
+        runner.assertNotValid();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/706edeb0/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java
new file mode 100644
index 0000000..4d21613
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.twitter;
+
+import com.twitter.hbc.core.endpoint.Location;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TestLocationUtil {
+
+    @Test
+    public void testParseLocationsSingle() {
+        final String swLon = "-122.75";
+        final String swLat = "36.8";
+        final String neLon = "-121.75";
+        final String neLat = "37.8";
+
+        final String locationString = swLon + "," + swLat + "," + neLon + "," 
+ neLat;
+        List<Location> locations = LocationUtil.parseLocations(locationString);
+        Assert.assertEquals(1, locations.size());
+
+        Location location = locations.get(0);
+        Assert.assertEquals(new 
Double(location.southwestCoordinate().longitude()), Double.valueOf(swLon));
+        Assert.assertEquals(new 
Double(location.southwestCoordinate().latitude()), Double.valueOf(swLat));
+        Assert.assertEquals(new 
Double(location.northeastCoordinate().longitude()), Double.valueOf(neLon));
+        Assert.assertEquals(new 
Double(location.northeastCoordinate().latitude()), Double.valueOf(neLat));
+    }
+
+    @Test
+    public void testParseLocationsMultiple() {
+        final Location location1 = new Location(new 
Location.Coordinate(-122.75, 36.8), new Location.Coordinate(-121.75,37.8));
+        final Location location2 = new Location(new Location.Coordinate(-74, 
40), new Location.Coordinate(-73, 41));
+        final Location location3 = new Location(new Location.Coordinate(-64, 
30), new Location.Coordinate(-63, 31));
+        final Location location4 = new Location(new Location.Coordinate(-54, 
20), new Location.Coordinate(-53, 21));
+
+        final List<Location> expectedLocations = Arrays.asList(location1, 
location2, location3, location4);
+
+        final String locationString = 
"-122.75,36.8,-121.75,37.8,-74,40,-73,41,-64,30,-63,31,-54,20,-53,21";
+        List<Location> locations = LocationUtil.parseLocations(locationString);
+        Assert.assertEquals(expectedLocations.size(), locations.size());
+
+        for (Location expectedLocation : expectedLocations) {
+            boolean found = false;
+            for (Location location : locations) {
+                if (location.northeastCoordinate().longitude() == 
expectedLocation.northeastCoordinate().longitude()
+                        && location.northeastCoordinate().latitude() == 
expectedLocation.northeastCoordinate().latitude()
+                        && location.southwestCoordinate().longitude() == 
expectedLocation.southwestCoordinate().longitude()
+                        && location.southwestCoordinate().latitude() == 
expectedLocation.southwestCoordinate().latitude()) {
+                    found = true;
+                    break;
+                }
+            }
+            Assert.assertTrue(found);
+        }
+
+    }
+}

Reply via email to