Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 90ae022e5 -> 269e25e99


NIFI-271


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/269e25e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/269e25e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/269e25e9

Branch: refs/heads/develop
Commit: 269e25e9934ca54277f016bdb5a1589bbe021a76
Parents: 90ae022
Author: joewitt <[email protected]>
Authored: Sat Apr 25 08:52:47 2015 -0400
Committer: joewitt <[email protected]>
Committed: Sat Apr 25 08:52:47 2015 -0400

----------------------------------------------------------------------
 .../nifi-twitter-processors/pom.xml             |  10 +-
 .../nifi/processors/twitter/GetTwitter.java     | 450 ++++++++++---------
 2 files changed, 232 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/269e25e9/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
index 45af0ce..4768dbc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
@@ -35,11 +35,11 @@
             <artifactId>nifi-processor-utils</artifactId>
         </dependency>
        
-               <dependency>
-                       <groupId>com.twitter</groupId>
-                       <artifactId>hbc-twitter4j</artifactId>
-                       <version>2.2.0</version>
-               </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>hbc-twitter4j</artifactId>
+            <version>2.2.0</version>
+        </dependency>
         
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/269e25e9/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
 
b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index 45b1ae1..a056867 100644
--- 
a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ 
b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -69,82 +69,84 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
 @SupportsBatching
 @Tags({"twitter", "tweets", "social media", "status", "json"})
 @CapabilityDescription("Pulls status changes from Twitter's streaming API")
-@WritesAttribute(attribute="mime.type", description="Sets mime type to 
application/json")
+@WritesAttribute(attribute = "mime.type", description = "Sets mime type to 
application/json")
 public class GetTwitter extends AbstractProcessor {
 
-       static final AllowableValue ENDPOINT_SAMPLE = new 
AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that 
provides public data, aka a 'garden hose'");
-       static final AllowableValue ENDPOINT_FIREHOSE = new 
AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that 
provides access to all tweets");
-       static final AllowableValue ENDPOINT_FILTER = new 
AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the 
stream to be filtered by specific terms or User IDs");
-       
-       public static final PropertyDescriptor ENDPOINT = new 
PropertyDescriptor.Builder()
-               .name("Twitter Endpoint")
-               .description("Specifies which endpoint data should be pulled 
from")
-               .required(true)
-               .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, 
ENDPOINT_FILTER)
-               .defaultValue(ENDPOINT_SAMPLE.getValue())
-               .build();
+    static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample 
Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 
'garden hose'");
+    static final AllowableValue ENDPOINT_FIREHOSE = new 
AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that 
provides access to all tweets");
+    static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter 
Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered 
by specific terms or User IDs");
+
+    public static final PropertyDescriptor ENDPOINT = new 
PropertyDescriptor.Builder()
+            .name("Twitter Endpoint")
+            .description("Specifies which endpoint data should be pulled from")
+            .required(true)
+            .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, 
ENDPOINT_FILTER)
+            .defaultValue(ENDPOINT_SAMPLE.getValue())
+            .build();
     public static final PropertyDescriptor CONSUMER_KEY = new 
PropertyDescriptor.Builder()
-       .name("Consumer Key")
-       .description("The Consumer Key provided by Twitter")
-       .required(true)
-       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-       .build();
+            .name("Consumer Key")
+            .description("The Consumer Key provided by Twitter")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor CONSUMER_SECRET = new 
PropertyDescriptor.Builder()
-               .name("Consumer Secret")
-               .description("The Consumer Secret provided by Twitter")
-               .required(true)
-               .sensitive(true)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .build();
+            .name("Consumer Secret")
+            .description("The Consumer Secret provided by Twitter")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
-               .name("Access Token")
-               .description("The Acces Token provided by Twitter")
-               .required(true)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .build();
+            .name("Access Token")
+            .description("The Acces Token provided by Twitter")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new 
PropertyDescriptor.Builder()
-               .name("Access Token Secret")
-               .description("The Access Token Secret provided by Twitter")
-               .required(true)
-               .sensitive(true)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .build();
+            .name("Access Token Secret")
+            .description("The Access Token Secret provided by Twitter")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor LANGUAGES = new 
PropertyDescriptor.Builder()
-       .name("Languages")
-       .description("A comma-separated list of languages for which tweets 
should be fetched")
-       .required(false)
-       .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-       .build();
+            .name("Languages")
+            .description("A comma-separated list of languages for which tweets 
should be fetched")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor FOLLOWING = new 
PropertyDescriptor.Builder()
-               .name("IDs to Follow")
-               .description("A comma-separated list of Twitter User ID's to 
follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
-               .required(false)
-               .addValidator(new FollowingValidator())
-               .build();
+            .name("IDs to Follow")
+            .description("A comma-separated list of Twitter User ID's to 
follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
+            .required(false)
+            .addValidator(new FollowingValidator())
+            .build();
     public static final PropertyDescriptor TERMS = new 
PropertyDescriptor.Builder()
-               .name("Terms to Filter On")
-               .description("A comma-separated list of terms to filter on. 
Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that 
if any term matches, the status update will be retrieved; multiple terms 
separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve 
status updates that have either 'hello' or both 'it' AND 'was'")
-               .required(false)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .build();
-    
-    
+            .name("Terms to Filter On")
+            .description("A comma-separated list of terms to filter on. 
Ignored unless Endpoint is set to 'Filter Endpoint'."
+                    + " The filter works such that if any term matches, the 
status update will be retrieved; multiple terms"
+                    + " separated by a space function as an 'AND'. I.e., 'it 
was, hello' will retrieve status updates that"
+                    + " have either 'hello' or both 'it' AND 'was'")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All status updates will be routed to this relationship")
-        .build();
+            .name("success")
+            .description("All status updates will be routed to this 
relationship")
+            .build();
 
     private List<PropertyDescriptor> descriptors;
     private Set<Relationship> relationships;
 
-    private final BlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<Event>(1000);
-    
+    private final BlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<>(1000);
+
     private volatile Client client;
     private volatile BlockingQueue<String> messageQueue;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(ENDPOINT);
         descriptors.add(CONSUMER_KEY);
         descriptors.add(CONSUMER_SECRET);
@@ -155,7 +157,7 @@ public class GetTwitter extends AbstractProcessor {
         descriptors.add(FOLLOWING);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
-        final Set<Relationship> relationships = new HashSet<Relationship>();
+        final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_SUCCESS);
         this.relationships = Collections.unmodifiableSet(relationships);
     }
@@ -169,192 +171,194 @@ public class GetTwitter extends AbstractProcessor {
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return descriptors;
     }
-    
+
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-       return new PropertyDescriptor.Builder()
-               .name(propertyDescriptorName)
-               .description("Adds a query parameter with name '" + 
propertyDescriptorName + "' to the Twitter query")
-               .required(false)
-               .dynamic(true)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .build();
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("Adds a query parameter with name '" + 
propertyDescriptorName + "' to the Twitter query")
+                .required(false)
+                .dynamic(true)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .build();
     }
-    
+
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-       final List<ValidationResult> results = new ArrayList<>();
-       final String endpointName = 
validationContext.getProperty(ENDPOINT).getValue();
-       
-       if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
-               if ( !validationContext.getProperty(TERMS).isSet() && 
!validationContext.getProperty(FOLLOWING).isSet() ) {
-                       results.add(new 
ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When
 using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + 
FOLLOWING.getName() + "' must be set").build());
-               }
-       }
-       
-       return results;
+        final List<ValidationResult> results = new ArrayList<>();
+        final String endpointName = 
validationContext.getProperty(ENDPOINT).getValue();
+
+        if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
+            if (!validationContext.getProperty(TERMS).isSet() && 
!validationContext.getProperty(FOLLOWING).isSet()) {
+                results.add(new 
ValidationResult.Builder().input("").subject(FOLLOWING.getName())
+                        .valid(false).explanation("When using the 'Filter 
Endpoint', at least one of '" + TERMS.getName() + "' or '" + 
FOLLOWING.getName() + "' must be set").build());
+            }
+        }
+
+        return results;
     }
-    
+
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-       // if any property is modified, the results are no longer valid. 
Destroy all messages in teh queue.
-       messageQueue.clear();
+        // if any property is modified, the results are no longer valid. 
Destroy all messages in teh queue.
+        messageQueue.clear();
     }
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws 
MalformedURLException {
-       messageQueue = new LinkedBlockingQueue<>(100000);
-       
-       final String endpointName = context.getProperty(ENDPOINT).getValue();
-       final Authentication oauth = new 
OAuth1(context.getProperty(CONSUMER_KEY).getValue(), 
-                       context.getProperty(CONSUMER_SECRET).getValue(), 
-                       context.getProperty(ACCESS_TOKEN).getValue(),
-                       context.getProperty(ACCESS_TOKEN_SECRET).getValue());
-
-       final ClientBuilder clientBuilder = new ClientBuilder();
-       clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
-               .authentication(oauth)
-               .eventMessageQueue(eventQueue)
-               .processor(new StringDelimitedProcessor(messageQueue));
-
-       final String languageString = context.getProperty(LANGUAGES).getValue();
-       final List<String> languages;
-       if ( languageString == null ) {
-               languages = null;
-       } else {
-               languages = new ArrayList<>();
-               for ( final String language : 
context.getProperty(LANGUAGES).getValue().split(",") ) {
-                       languages.add(language.trim());
-               }
-       }
-       
-       final String host;
-       final StreamingEndpoint streamingEndpoint;
-       if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) {
-               host = Constants.STREAM_HOST;
-               final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
-               streamingEndpoint = sse;
-               if ( languages != null ) {
-                       sse.languages(languages);
-               }
-       } else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) {
-               host = Constants.STREAM_HOST;
-               final StatusesFirehoseEndpoint firehoseEndpoint = new 
StatusesFirehoseEndpoint();
-               streamingEndpoint = firehoseEndpoint;
-               if ( languages != null ) {
-                       firehoseEndpoint.languages(languages);
-               }
-       } else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
-               host = Constants.STREAM_HOST;
-               final StatusesFilterEndpoint filterEndpoint = new 
StatusesFilterEndpoint();
-               
-               final String followingString = 
context.getProperty(FOLLOWING).getValue();
-               final List<Long> followingIds;
-               if ( followingString == null ) {
-                       followingIds = Collections.emptyList();
-               } else {
-                       followingIds = new ArrayList<>();
-                       
-                       for ( final String split : followingString.split(",") ) 
{
-                               final Long id = Long.parseLong(split.trim());
-                               followingIds.add(id);
-                       }
-               }
-               
-               final String termString = context.getProperty(TERMS).getValue();
-               final List<String> terms;
-               if ( termString == null ) {
-                       terms = Collections.emptyList();
-               } else {
-                       terms = new ArrayList<>();
-                       for ( final String split : termString.split(",") ) {
-                               terms.add(split.trim());
-                       }
-               }
-               
-               if ( !terms.isEmpty() ) {
-                       filterEndpoint.trackTerms(terms);
-               }
-               
-               if ( !followingIds.isEmpty() ) {
-                       filterEndpoint.followings(followingIds);
-               }
-               
-               if ( languages != null ) {
-                       filterEndpoint.languages(languages);
-               }
-               streamingEndpoint = filterEndpoint;
-       } else {
-               throw new AssertionError("Endpoint was invalid value: " + 
endpointName);
-       }
-
-       clientBuilder.hosts(host).endpoint(streamingEndpoint);
-       client = clientBuilder.build();
-       client.connect();
+        messageQueue = new LinkedBlockingQueue<>(100000);
+
+        final String endpointName = context.getProperty(ENDPOINT).getValue();
+        final Authentication oauth = new 
OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
+                context.getProperty(CONSUMER_SECRET).getValue(),
+                context.getProperty(ACCESS_TOKEN).getValue(),
+                context.getProperty(ACCESS_TOKEN_SECRET).getValue());
+
+        final ClientBuilder clientBuilder = new ClientBuilder();
+        clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
+                .authentication(oauth)
+                .eventMessageQueue(eventQueue)
+                .processor(new StringDelimitedProcessor(messageQueue));
+
+        final String languageString = 
context.getProperty(LANGUAGES).getValue();
+        final List<String> languages;
+        if (languageString == null) {
+            languages = null;
+        } else {
+            languages = new ArrayList<>();
+            for (final String language : 
context.getProperty(LANGUAGES).getValue().split(",")) {
+                languages.add(language.trim());
+            }
+        }
+
+        final String host;
+        final StreamingEndpoint streamingEndpoint;
+        if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) {
+            host = Constants.STREAM_HOST;
+            final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
+            streamingEndpoint = sse;
+            if (languages != null) {
+                sse.languages(languages);
+            }
+        } else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) {
+            host = Constants.STREAM_HOST;
+            final StatusesFirehoseEndpoint firehoseEndpoint = new 
StatusesFirehoseEndpoint();
+            streamingEndpoint = firehoseEndpoint;
+            if (languages != null) {
+                firehoseEndpoint.languages(languages);
+            }
+        } else if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
+            host = Constants.STREAM_HOST;
+            final StatusesFilterEndpoint filterEndpoint = new 
StatusesFilterEndpoint();
+
+            final String followingString = 
context.getProperty(FOLLOWING).getValue();
+            final List<Long> followingIds;
+            if (followingString == null) {
+                followingIds = Collections.emptyList();
+            } else {
+                followingIds = new ArrayList<>();
+
+                for (final String split : followingString.split(",")) {
+                    final Long id = Long.parseLong(split.trim());
+                    followingIds.add(id);
+                }
+            }
+
+            final String termString = context.getProperty(TERMS).getValue();
+            final List<String> terms;
+            if (termString == null) {
+                terms = Collections.emptyList();
+            } else {
+                terms = new ArrayList<>();
+                for (final String split : termString.split(",")) {
+                    terms.add(split.trim());
+                }
+            }
+
+            if (!terms.isEmpty()) {
+                filterEndpoint.trackTerms(terms);
+            }
+
+            if (!followingIds.isEmpty()) {
+                filterEndpoint.followings(followingIds);
+            }
+
+            if (languages != null) {
+                filterEndpoint.languages(languages);
+            }
+            streamingEndpoint = filterEndpoint;
+        } else {
+            throw new AssertionError("Endpoint was invalid value: " + 
endpointName);
+        }
+
+        clientBuilder.hosts(host).endpoint(streamingEndpoint);
+        client = clientBuilder.build();
+        client.connect();
     }
 
     @OnStopped
     public void shutdownClient() {
-       if ( client != null ) {
-               client.stop();
-       }
+        if (client != null) {
+            client.stop();
+        }
     }
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-       final Event event = eventQueue.poll();
-       if ( event != null ) {
-               switch (event.getEventType()) {
-                       case STOPPED_BY_ERROR:
-                               getLogger().error("Received error {}: {} due to 
{}. Will not attempt to reconnect", new Object[] {event.getEventType(), 
event.getMessage(), event.getUnderlyingException()});
-                               break;
-                       case CONNECTION_ERROR:
-                       case HTTP_ERROR:
-                               getLogger().error("Received error {}: {}. Will 
attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()});
-                               client.reconnect();
-                               break;
-                       default:
-                               break;
-               }
-       }
-       
-       final String tweet = messageQueue.poll();
-       if ( tweet == null ) {
-               context.yield();
-               return;
-       }
-       
-       FlowFile flowFile = session.create();
-       flowFile = session.write(flowFile, new OutputStreamCallback() {
-                       @Override
-                       public void process(final OutputStream out) throws 
IOException {
-                               
out.write(tweet.getBytes(StandardCharsets.UTF_8));
-                       }
-       });
-       
-       final Map<String, String> attributes = new HashMap<>();
-       attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
-       attributes.put(CoreAttributes.FILENAME.key(), 
flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
-       flowFile = session.putAllAttributes(flowFile, attributes);
-       
-       session.transfer(flowFile, REL_SUCCESS);
-       session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST 
+ client.getEndpoint().getURI().toString());
+        final Event event = eventQueue.poll();
+        if (event != null) {
+            switch (event.getEventType()) {
+                case STOPPED_BY_ERROR:
+                    getLogger().error("Received error {}: {} due to {}. Will 
not attempt to reconnect", new Object[]{event.getEventType(), 
event.getMessage(), event.getUnderlyingException()});
+                    break;
+                case CONNECTION_ERROR:
+                case HTTP_ERROR:
+                    getLogger().error("Received error {}: {}. Will attempt to 
reconnect", new Object[]{event.getEventType(), event.getMessage()});
+                    client.reconnect();
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        final String tweet = messageQueue.poll();
+        if (tweet == null) {
+            context.yield();
+            return;
+        }
+
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                out.write(tweet.getBytes(StandardCharsets.UTF_8));
+            }
+        });
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+        attributes.put(CoreAttributes.FILENAME.key(), 
flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, REL_SUCCESS);
+        session.getProvenanceReporter().receive(flowFile, 
Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
     }
 
     private static class FollowingValidator implements Validator {
-       private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
-       
-               @Override
-               public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
-                       final String[] splits = input.split(",");
-                       for ( final String split : splits ) {
-                               if ( 
!NUMBER_PATTERN.matcher(split.trim()).matches() ) {
-                                       return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must
 be comma-separted list of User ID's").build();
-                               }
-                       }
-                       
-                       return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-               }
-       
+
+        private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final String[] splits = input.split(",");
+            for (final String split : splits) {
+                if (!NUMBER_PATTERN.matcher(split.trim()).matches()) {
+                    return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must
 be comma-separted list of User ID's").build();
+                }
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+        }
+
     }
 }

Reply via email to