Resolves STREAMS-416

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0b512d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0b512d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0b512d8b

Branch: refs/heads/master
Commit: 0b512d8b6c0d2b36a635d20b9be9a481eb67d6b9
Parents: 2c12724
Author: Elias Ponvert <eponv...@gmail.com>
Authored: Mon Oct 17 10:37:51 2016 -0500
Committer: Elias Ponvert <eponv...@gmail.com>
Committed: Mon Oct 17 10:37:51 2016 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   5 -
 .../streams-persist-cassandra/pom.xml           | 168 ----
 .../configuration/CassandraConfiguration.java   |  81 --
 .../model/CassandraActivityStreamsEntry.java    |  45 -
 .../CassandraActivityStreamsRepository.java     | 176 ----
 .../repository/impl/CassandraKeyspace.java      |  61 --
 .../impl/CassandraSubscriptionRepository.java   |  69 --
 .../spring/streams-cassandra-context.xml        |  25 -
 .../CassandraActivityStreamsRepositoryTest.java |  99 ---
 .../impl/CassandraActivitySubscriptionTest.java |  54 --
 .../streams-provider-datasift/README.md         |  18 -
 .../streams-provider-datasift/pom.xml           | 242 ------
 .../streams/datasift/csdl/DatasiftCsdlUtil.java | 132 ---
 .../DatasiftActivitySerializerProcessor.java    |  97 ---
 .../DatasiftTypeConverterProcessor.java         | 167 ----
 .../datasift/provider/DatasiftConverter.java    |  38 -
 .../provider/DatasiftManagedSourceSetup.java    |  94 ---
 .../datasift/provider/DatasiftPushProvider.java | 270 ------
 .../provider/DatasiftStreamConfigurator.java    |  52 --
 .../provider/DatasiftStreamProvider.java        | 242 ------
 .../streams/datasift/provider/ErrorHandler.java |  47 --
 .../streams/datasift/provider/Subscription.java |  60 --
 .../serializer/DatasiftActivitySerializer.java  |  65 --
 .../serializer/DatasiftEventClassifier.java     |  53 --
 .../DatasiftInstagramActivitySerializer.java    | 125 ---
 .../DatasiftInteractionActivitySerializer.java  | 247 ------
 .../DatasiftTwitterActivitySerializer.java      | 271 ------
 .../datasift/util/StreamsDatasiftMapper.java    |  89 --
 .../org/apache/streams/datasift/Datasift.json   | 462 -----------
 .../streams/datasift/DatasiftConfiguration.json | 136 ----
 .../datasift/DatasiftPushConfiguration.json     |  20 -
 .../datasift/DatasiftStreamConfiguration.json   |  20 -
 .../streams/datasift/DatasiftWebhookData.json   |  35 -
 .../datasift/facebook/DatasiftFacebook.json     | 125 ---
 .../datasift/instagram/DatasiftInstagram.json   | 183 -----
 .../interaction/DatasiftInteraction.json        |  97 ---
 .../datasift/twitter/DatasiftTwitter.json       | 370 ---------
 .../datasift/twitter/DatasiftTwitterMedia.json  | 132 ---
 .../datasift/twitter/DatasiftTwitterUser.json   |  73 --
 .../src/main/resources/datasift.conf            |  25 -
 .../src/site/markdown/index.md                  |  27 -
 .../com/datasift/test/DatasiftSerDeTest.java    |  75 --
 .../provider/DatasiftStreamProviderTest.java    | 144 ----
 .../datasift/provider/ErrorHandlerTest.java     |  40 -
 .../datasift/provider/SubscriptionTest.java     |  58 --
 .../DatasiftActivitySerializerIT.java           | 110 ---
 .../serializer/DatasiftEventClassifierTest.java |  71 --
 .../DatasiftInstagramActivitySerializerIT.java  |  57 --
 ...DatasiftInteractionActivitySerializerIT.java |  62 --
 .../DatasiftTwitterActivitySerializerIT.java    |  57 --
 .../gnip-edc-facebook/pom.xml                   | 152 ----
 .../test/FacebookEDCAsActivityTest.java         |  93 ---
 .../facebook/test/FacebookEDCSerDeTest.java     |  74 --
 .../gnip-edc-flickr/pom.xml                     | 139 ----
 .../flickr/test/FlickrEDCAsActivityTest.java    |  92 ---
 .../gnip/flickr/test/FlickrEDCSerDeTest.java    |  76 --
 .../gnip-edc-googleplus/pom.xml                 | 102 ---
 .../com/gplus/api/GPlusActivitySerializer.java  |  93 ---
 .../com/gplus/api/GPlusEDCAsActivityTest.java   |  95 ---
 .../gnip-edc-instagram/pom.xml                  | 116 ---
 .../jsonschema/com/instagram/Instagram.json     | 208 -----
 .../com/instagram/test/InstagramSerDeTest.java  |  70 --
 .../gnip-edc-reddit/pom.xml                     | 103 ---
 .../reddit/api/RedditActivitySerializer.java    | 107 ---
 .../reddit/api/RedditEDCAsActivityJSONTest.java |  97 ---
 .../gnip-edc-youtube/pom.xml                    | 139 ----
 .../java/com/gnip/test/YouTubeEDCSerDeTest.java |  79 --
 .../com/gnip/test/YoutubeEDCAsActivityTest.java |  87 --
 .../gnip-powertrack/README.md                   |   8 -
 .../gnip-powertrack/pom.xml                     | 207 -----
 .../ActivityXMLActivitySerializer.java          | 240 ------
 .../gnip/powertrack/GnipActivityFixer.java      | 151 ----
 .../PowerTrackActivitySerializer.java           | 121 ---
 .../src/main/jsonschema/com/gnip/Gnip.json      | 815 -------------------
 .../src/main/xmlschema/com/gnip/binding.xjb     |  33 -
 .../src/main/xmlschema/com/gnip/entry.xsd       | 398 ---------
 .../gnip-powertrack/src/site/markdown/index.md  |  15 -
 .../test/PowerTrackDeserializationTest.java     |  55 --
 streams-contrib/streams-provider-gnip/pom.xml   |  67 --
 79 files changed, 9503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 8db46c1..b50f440 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -37,7 +37,6 @@
     </properties>
 
     <modules>
-        <!--<module>streams-persist-cassandra</module>-->
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-filebuffer</module>
@@ -47,16 +46,12 @@
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
         <module>streams-amazon-aws</module>
-        <!--<module>streams-processor-lucene</module>-->
-        <!--<module>streams-processor-tika</module>-->
         <module>streams-processor-jackson</module>
         <module>streams-processor-json</module>
         <module>streams-processor-urls</module>
         <module>streams-processor-peoplepattern</module>
-        <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>
         <module>streams-provider-google</module>
-        <module>streams-provider-gnip</module>
         <module>streams-provider-instagram</module>
         <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml 
b/streams-contrib/streams-persist-cassandra/pom.xml
deleted file mode 100644
index 578004c..0000000
--- a/streams-contrib/streams-persist-cassandra/pom.xml
+++ /dev/null
@@ -1,168 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-contrib</artifactId>
-        <version>0.4-incubating-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>streams-persist-cassandra</artifactId>
-    <name>${project.artifactId}</name>
-
-    <description>Cassandra Module</description>
-
-    <properties>
-        <bundle.symbolicName>streams-persist-cassandra</bundle.symbolicName>
-        <bundle.namespace>org.apache.streams</bundle.namespace>
-        <easymock.version>3.2</easymock.version>
-    </properties>
-
-    <packaging>bundle</packaging>
-    <build>
-        <resources>
-        <resource>
-            <directory>src/main/resources</directory>
-        </resource>
-
-            <resource>
-                <directory>.</directory>
-                <includes>
-                    <include>plugin.xml</include>
-                    <include>plugin.properties</include>
-                    <include>icons/**</include>
-                </includes>
-            </resource>
-        </resources>
-    <plugins>
-        <plugin>
-            <groupId>org.ops4j</groupId>
-            <artifactId>maven-pax-plugin</artifactId>
-            <!--
-             | enable improved OSGi compilation support for the bundle 
life-cycle.
-             | to switch back to the standard bundle life-cycle, move this 
setting
-             | down to the maven-bundle-plugin section
-            -->
-            <extensions>true</extensions>
-        </plugin>
-        <plugin>
-            <groupId>org.apache.felix</groupId>
-            <artifactId>maven-bundle-plugin</artifactId>
-            <version>1.4.3</version>
-            <!--
-             | the following instructions build a simple set of public/private 
classes into an OSGi bundle
-            -->
-            <configuration>
-                <instructions>
-                    
<Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
-                    <Bundle-Version>${project.version}</Bundle-Version>
-                    <Export-Package>
-                        
${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl,
 org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
-                    </Export-Package>
-                    
<Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model,
 ${bundle.namespace}.cassandra.configuration </Private-Package>
-                    <Import-Package>
-                        
org.apache.rave.model,org.apache.rave.portal.model.impl,
-                        com.datastax.driver.core, 
com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
-                        javax.persistence, org.apache.commons.logging, 
com.google.common.collect, org.codehaus.jackson.map,
-                        org.apache.commons.lang,
-                        org.apache.streams.osgi.components.activitysubscriber,
-                        org.springframework.beans.factory.annotation, 
org.springframework.stereotype
-                    </Import-Package>
-                </instructions>
-            </configuration>
-        </plugin>
-    </plugins>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rave</groupId>
-            <artifactId>rave-core-api</artifactId>
-            <version>${rave.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.rave</groupId>
-            <artifactId>rave-core</artifactId>
-            <version>${rave.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.8.2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.datastax.cassandra</groupId>
-            <artifactId>cassandra-driver-core</artifactId>
-            <version>${datastax.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.jboss.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>3.2.9.Final</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams.osgi.components</groupId>
-            <artifactId>activity-subscriber</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.easymock</groupId>
-            <artifactId>easymock</artifactId>
-            <version>${easymock.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-testing</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-    </dependencies>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
deleted file mode 100644
index 195467d..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.configuration;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CassandraConfiguration {
-    @Value("${keyspaceName}")
-    private String keyspaceName;
-
-    @Value("${activitystreamsColumnFamilyName}")
-    private String activitystreamsColumnFamilyName;
-
-    @Value("${subscriptionColumnFamilyName}")
-    private String subscriptionColumnFamilyName;
-
-    @Value("${publisherColumnFamilyName}")
-    private String publisherColumnFamilyName;
-
-    @Value("${cassandraPort}")
-    private String cassandraPort;
-
-    public String getKeyspaceName() {
-        return keyspaceName;
-    }
-
-    public void setKeyspaceName(String keyspaceName) {
-        this.keyspaceName = keyspaceName;
-    }
-
-    public String getActivitystreamsColumnFamilyName() {
-        return activitystreamsColumnFamilyName;
-    }
-
-    public void setActivitystreamsColumnFamilyName(String 
activitystreamsColumnFamilyName) {
-        this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
-    }
-
-    public String getSubscriptionColumnFamilyName() {
-        return subscriptionColumnFamilyName;
-    }
-
-    public void setSubscriptionColumnFamilyName(String 
subscriptionColumnFamilyName) {
-        this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
-    }
-
-    public String getPublisherColumnFamilyName() {
-        return publisherColumnFamilyName;
-    }
-
-    public void setPublisherColumnFamilyName(String publisherColumnFamilyName) 
{
-        this.publisherColumnFamilyName = publisherColumnFamilyName;
-    }
-
-    public String getCassandraPort() {
-        return cassandraPort;
-    }
-
-    public void setCassandraPort(String cassandraPort) {
-        this.cassandraPort = cassandraPort;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
deleted file mode 100644
index 88db8aa..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.model;
-
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.codehaus.jackson.map.annotate.JsonDeserialize;
-
-import java.util.Date;
-
-public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl 
implements Comparable<CassandraActivityStreamsEntry>{
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject object;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject target;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject actor;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject provider;
-
-    public int compareTo(CassandraActivityStreamsEntry entry){
-        return (this.getPublished()).compareTo(entry.getPublished());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
deleted file mode 100644
index 56e5416..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-
-public class CassandraActivityStreamsRepository {
-
-    private static final Log LOG = 
LogFactory.getLog(CassandraActivityStreamsRepository.class);
-
-    private CassandraKeyspace keyspace;
-    private CassandraConfiguration configuration;
-
-    @Autowired
-    public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, 
CassandraConfiguration configuration) {
-        this.configuration = configuration;
-        this.keyspace = keyspace;
-
-        try {
-            keyspace.getSession().execute("CREATE TABLE " + 
configuration.getActivitystreamsColumnFamilyName() + " (" +
-                    "id text, " +
-                    "published timestamp, " +
-                    "verb text, " +
-                    "tags text, " +
-
-                    "actor_displayname text, " +
-                    "actor_id text, " +
-                    "actor_url text, " +
-                    "actor_objecttype text, " +
-
-                    "target_displayname text, " +
-                    "target_id text, " +
-                    "target_url text, " +
-
-                    "provider_url text, " +
-
-                    "object_url text, " +
-                    "object_displayname text, " +
-                    "object_id text, " +
-                    "object_objecttype text, " +
-
-                    "PRIMARY KEY (id, tags, published));");
-        } catch (AlreadyExistsException ignored) {
-        }
-    }
-
-    public void save(ActivityStreamsEntry entry) {
-        String sql = "INSERT INTO " + 
configuration.getActivitystreamsColumnFamilyName() + " (" +
-                "id, published, verb, tags, " +
-                "actor_displayname, actor_objecttype, actor_id, actor_url, " +
-                "target_displayname, target_id, target_url, " +
-                "provider_url, " +
-                "object_displayname, object_objecttype, object_id, object_url) 
" +
-                "VALUES ('" +
-                entry.getId() + "','" +
-                entry.getPublished().getTime() + "','" +
-                entry.getVerb() + "','" +
-                entry.getTags() + "','" +
-
-                entry.getActor().getDisplayName() + "','" +
-                entry.getActor().getObjectType() + "','" +
-                entry.getActor().getId() + "','" +
-                entry.getActor().getUrl() + "','" +
-
-                entry.getTarget().getDisplayName() + "','" +
-                entry.getTarget().getId() + "','" +
-                entry.getTarget().getUrl() + "','" +
-
-                entry.getProvider().getUrl() + "','" +
-
-                entry.getObject().getDisplayName() + "','" +
-                entry.getObject().getObjectType() + "','" +
-                entry.getObject().getId() + "','" +
-                entry.getObject().getUrl() +
-
-                "')";
-        keyspace.getSession().execute(sql);
-    }
-
-    public List<CassandraActivityStreamsEntry> 
getActivitiesForFilters(List<String> filters, Date lastUpdated) {
-        List<CassandraActivityStreamsEntry> results = new 
ArrayList<CassandraActivityStreamsEntry>();
-
-        for (String tag : filters) {
-            String cql = "SELECT * FROM " + 
configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
-            //add filters
-            cql = cql + " tags = '" + tag + "' AND ";
-
-            //specify last modified
-            cql = cql + "published > " + lastUpdated.getTime() + " ALLOW 
FILTERING";
-
-            //execute the cql query and store the results
-            ResultSet set = keyspace.getSession().execute(cql);
-
-            //iterate through the results and create a new 
ActivityStreamsEntry for every result returned
-
-            for (Row row : set) {
-                CassandraActivityStreamsEntry entry = new 
CassandraActivityStreamsEntry();
-                ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject target = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject object = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject provider = new 
ActivityStreamsObjectImpl();
-
-                actor.setDisplayName(row.getString("actor_displayname"));
-                actor.setId(row.getString("actor_id"));
-                actor.setObjectType(row.getString("actor_objecttype"));
-                actor.setUrl(row.getString("actor_url"));
-
-                target.setDisplayName(row.getString("target_displayname"));
-                target.setId(row.getString("target_id"));
-                target.setUrl(row.getString("target_url"));
-
-                object.setDisplayName(row.getString("object_displayname"));
-                object.setObjectType(row.getString("object_objecttype"));
-                object.setUrl(row.getString("object_url"));
-                object.setId(row.getString("object_id"));
-
-                provider.setUrl(row.getString("provider_url"));
-
-                entry.setPublished(row.getDate("published"));
-                entry.setVerb(row.getString("verb"));
-                entry.setId(row.getString("id"));
-                entry.setTags(row.getString("tags"));
-                entry.setActor(actor);
-                entry.setTarget(target);
-                entry.setObject(object);
-                entry.setProvider(provider);
-
-                results.add(entry);
-            }
-        }
-
-        return results;
-    }
-
-    public void dropTable(String table) {
-        String cql = "DROP TABLE " + table;
-        keyspace.getSession().execute(cql);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
deleted file mode 100644
index 9106a51..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraKeyspace {
-    private CassandraConfiguration configuration;
-    private Cluster cluster;
-    private Session session;
-
-    @Autowired
-    public CassandraKeyspace(CassandraConfiguration configuration){
-        this.configuration = configuration;
-
-        cluster = 
Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
-        session = cluster.connect();
-
-        //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
-        try {
-            session.execute("CREATE KEYSPACE " + 
configuration.getKeyspaceName() + " WITH replication = { 'class': 
'SimpleStrategy','replication_factor' : 1 };");
-        } catch (AlreadyExistsException ignored) {
-        }
-
-        //connect to the keyspace
-        session = cluster.connect(configuration.getKeyspaceName());
-    }
-
-    public Session getSession(){
-        return session;
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        try {
-            cluster.shutdown();
-        } finally {
-            super.finalize();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
 
b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
deleted file mode 100644
index f5fe471..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraSubscriptionRepository {
-    private static final Log LOG = 
LogFactory.getLog(CassandraSubscriptionRepository.class);
-
-    private CassandraKeyspace keyspace;
-    private CassandraConfiguration configuration;
-
-    @Autowired
-    public CassandraSubscriptionRepository(CassandraKeyspace keyspace, 
CassandraConfiguration configuration) {
-        this.keyspace = keyspace;
-        this.configuration = configuration;
-
-        try {
-            keyspace.getSession().execute("CREATE TABLE " + 
configuration.getSubscriptionColumnFamilyName() + " (" +
-                    "id text, " +
-                    "filters text, " +
-
-                    "PRIMARY KEY (id));");
-        } catch (AlreadyExistsException ignored) {
-        }
-    }
-
-    public String getFilters(String id){
-        String cql = "SELECT * FROM " + 
configuration.getSubscriptionColumnFamilyName()  + " WHERE id = '" + id+"';";
-
-        ResultSet set = keyspace.getSession().execute(cql);
-
-        return set.one().getString("filters");
-    }
-
-    public void save(ActivityStreamsSubscription subscription){
-        String cql = "INSERT INTO " + 
configuration.getSubscriptionColumnFamilyName()  + " (" +
-                "id, filters) " +
-                "VALUES ('" +
-                subscription.getAuthToken() + "','" +
-                StringUtils.join(subscription.getFilters(), " ") +
-
-                "')";
-        keyspace.getSession().execute(cql);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
 
b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
deleted file mode 100644
index 842c918..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<beans
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-        xmlns="http://www.springframework.org/schema/beans";
-        xmlns:context="http://www.springframework.org/schema/context";
-        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
-        http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd";>
-
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
 
b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
deleted file mode 100644
index 978af10..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import static org.easymock.EasyMock.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-public class CassandraActivityStreamsRepositoryTest {
-
-    private CassandraActivityStreamsRepository repository;
-
-
-    @Before
-    public void setup() {
-        CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
-        CassandraConfiguration configuration = 
createMock(CassandraConfiguration.class);
-        repository = new CassandraActivityStreamsRepository(keyspace, 
configuration);
-    }
-
-    @Ignore
-    @Test
-    public void saveActivity() {
-        ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
-        ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject target = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject object = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
-        actor.setId("actorid1");
-        actor.setUrl("actorurl1");
-        actor.setDisplayName("actorname1");
-
-        target.setId("targetid1");
-        target.setUrl("targeturl1");
-        target.setDisplayName("r501");
-
-        provider.setUrl("providerurl");
-
-        object.setId("objectid1");
-        object.setDisplayName("objectname1");
-
-        entry.setId("dink");
-        entry.setVerb("verb1");
-        entry.setTags("r501");
-        entry.setProvider(provider);
-        Date d = new Date();
-        entry.setPublished(d);
-        entry.setActor(actor);
-        entry.setObject(object);
-        entry.setTarget(target);
-
-        repository.save(entry);
-    }
-
-    @Ignore
-    @Test
-    public void getActivity() {
-        String cql = "tags";
-        String other = "r501";
-        List<String> f = Arrays.asList(cql, other);
-        Date d = new Date(0);
-        List<CassandraActivityStreamsEntry> results = 
repository.getActivitiesForFilters(f,d);
-    }
-
-    @Ignore
-    @Test
-    public void dropTableTest(){
-        repository.dropTable("coltest");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
 
b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
deleted file mode 100644
index 2a90462..0000000
--- 
a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import 
org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class CassandraActivitySubscriptionTest {
-
-    public CassandraSubscriptionRepository repository;
-
-
-    @Before
-    public void setup() {
-//        repository = new CassandraSubscriptionRepository();
-    }
-
-    @Ignore
-    @Test
-    public void saveTest(){
-        ActivityStreamsSubscription subscription = new 
ActivityStreamsSubscriptionImpl();
-        subscription.setFilters(Arrays.asList("thisis", "atest"));
-        subscription.setAuthToken("subid");
-
-        repository.save(subscription);
-    }
-
-    @Ignore
-    @Test
-    public void getTest(){
-        String filters = repository.getFilters("subid");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/README.md 
b/streams-contrib/streams-provider-datasift/README.md
deleted file mode 100644
index a29fb16..0000000
--- a/streams-contrib/streams-provider-datasift/README.md
+++ /dev/null
@@ -1,18 +0,0 @@
-streams-provider-datasift
-=====================
-
-Datasift Provider
-
-Example configuration:
-
-    datasift {
-        apiKey = ""
-        userName = ""
-        hashes = [
-            "b8aaf7cec5faa2fadbd55d651933a31e",
-            "f41f054e2a2ba8d2e7b0d74f56e727d6"
-        ]
-    }
-    
-
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml 
b/streams-contrib/streams-provider-datasift/pom.xml
deleted file mode 100644
index def7826..0000000
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ /dev/null
@@ -1,242 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-contrib</artifactId>
-        <version>0.4-incubating-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>streams-provider-datasift</artifactId>
-    <name>${project.artifactId}</name>
-
-    <description>Datasift Provider</description>
-
-    <properties>
-        <skipITs>true</skipITs>
-        
<testDataBaseURl>http://streams.peoplepattern.com.s3.amazonaws.com/test-data/</testDataBaseURl>
-    </properties>
-
-
-    <dependencies>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-processor-jackson</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-provider-twitter</artifactId>
-            <version>${project.version}</version>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-provider-instagram</artifactId>
-            <version>${project.version}</version>
-            <optional>true</optional>
-        </dependency>
-        <dependency>
-            <groupId>com.datasift.client</groupId>
-            <artifactId>datasift-java</artifactId>
-            <version>3.2.6</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>commons-logging</artifactId>
-                    <groupId>commons-logging</groupId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.boundary</groupId>
-                    <artifactId>high-scale-lib</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.boundary</groupId>
-            <artifactId>high-scale-lib</artifactId>
-            <version>1.0.6</version>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jsonschema2pojo</groupId>
-            <artifactId>jsonschema2pojo-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <version>1.9.5</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-util</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-testing</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java17</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-        <plugins>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                
<source>target/generated-sources/jsonschema2pojo</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-test-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-test-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>src/test/java17</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.jsonschema2pojo</groupId>
-                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
-                <configuration>
-                    <addCompileSourceRoot>true</addCompileSourceRoot>
-                    <generateBuilders>true</generateBuilders>
-                    
<sourceDirectory>${project.basedir}/src/main/jsonschema</sourceDirectory>
-                    
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
-                    <targetPackage>org.apache.streams.datasift</targetPackage>
-                    <useLongIntegers>true</useLongIntegers>
-                    <useJodaDates>true</useJodaDates>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>generate</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>com.googlecode.maven-download-plugin</groupId>
-                <artifactId>download-maven-plugin</artifactId>
-                <version>1.2.1</version>
-                <executions>
-                    <execution>
-                        <id>download-it-data</id>
-                        <phase>pre-integration-test</phase>
-                        <goals>
-                            <goal>wget</goal>
-                        </goals>
-                        <configuration>
-                            
<url>${testDataBaseURl}/${project.artifactId}.zip</url>
-                            <unpack>true</unpack>
-                            
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                            <!--<md5>df65b5642f33676313ebe4d5b69a3fff</md5>-->
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <configuration>
-                    <skipTests>${skipITs}</skipTests>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
-            </plugin>
-
-        </plugins>
-
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
deleted file mode 100644
index 049ed8c..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.csdl;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.ListIterator;
-
-public class DatasiftCsdlUtil {
-
-       private static final Logger log = LoggerFactory
-                       .getLogger(DatasiftCsdlUtil.class);
-
-       public static String csdlFromTwitterUserIds(List<String> list) throws 
Exception {
-               
-               StringBuilder csdlBuilder = new StringBuilder();
-
-        csdlBuilder.append("twitter.user.id in [");
-        ListIterator<String> listIterator = 
Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("]\n");
-        csdlBuilder.append(" OR\n");
-        csdlBuilder.append("twitter.in_reply_to_user_id contains_any \"");
-        listIterator = Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-        csdlBuilder.append(" OR\n");
-        csdlBuilder.append("twitter.mention_ids in [");
-        listIterator = Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("]\n");
-
-               log.debug(csdlBuilder.toString());
-               
-               return csdlBuilder.toString();
-       }
-       
-       public static String csdlFromTwitterUserNames(List<String> list) throws 
Exception {
-
-        StringBuilder csdlBuilder = new StringBuilder();
-
-        csdlBuilder.append("twitter.user.screen_name contains_any \"");
-        ListIterator<String> listIterator = 
Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-        csdlBuilder.append(" OR\n");
-        csdlBuilder.append("twitter.in_reply_to_screen_name contains_any \"");
-        listIterator = Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-        csdlBuilder.append(" OR\n");
-        csdlBuilder.append("twitter.mentions contains_any \"");
-        listIterator = Lists.newArrayList(list).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-
-        log.debug(csdlBuilder.toString());
-
-        return csdlBuilder.toString();
-       }
-
-    public static String csdlFromKeywords(List<String> include, List<String> 
exclude) throws Exception {
-
-        StringBuilder csdlBuilder = new StringBuilder();
-
-        csdlBuilder.append("interaction.content contains_any \"");
-        ListIterator<String> listIterator = 
Lists.newArrayList(include).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-        csdlBuilder.append(" AND NOT ( \n");
-        csdlBuilder.append("interaction.content \"");
-        listIterator = Lists.newArrayList(exclude).listIterator();
-        while( listIterator.hasNext() ) {
-            csdlBuilder.append(listIterator.next());
-            if (listIterator.hasNext())
-                csdlBuilder.append(",");
-        }
-        csdlBuilder.append("\"\n");
-        csdlBuilder.append(")\n");
-
-        log.debug(csdlBuilder.toString());
-
-        return csdlBuilder.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
deleted file mode 100644
index 43b16b2..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.provider.DatasiftConverter;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftActivitySerializerProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = 
"DatasiftActivitySerializerProcessor";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DatasiftActivitySerializerProcessor.class);
-
-    private ObjectMapper mapper;
-    private Class outClass;
-    private DatasiftActivitySerializer datasiftActivitySerializer;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public DatasiftActivitySerializerProcessor(Class outClass) {
-        this.outClass = outClass;
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Activity activity;
-        try {
-            Datasift node;
-            if( entry.getDocument() instanceof String ) {
-                node = this.mapper.readValue((String)entry.getDocument(), 
Datasift.class);
-            } else if( entry.getDocument() instanceof Datasift ) {
-                node = (Datasift) entry.getDocument();
-            } else {
-                node = this.mapper.convertValue(entry.getDocument(), 
Datasift.class);
-            }
-            if(node != null) {
-                activity = this.datasiftActivitySerializer.deserialize(node);
-                StreamsDatum datum = new StreamsDatum(activity, entry.getId(), 
entry.getTimestamp(), entry.getSequenceid());
-                datum.setMetadata(entry.getMetadata());
-                result.add(datum);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception converting Datasift Interaction to 
"+this.outClass.getName()+ " : {}", e);
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
-        this.datasiftActivitySerializer = new DatasiftActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
deleted file mode 100644
index dcffd1a..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.provider.DatasiftConverter;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.jackson.CleanAdditionalPropertiesProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "RegexUrlExtractor";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
-
-    private ObjectMapper mapper;
-    private Class outClass;
-    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
-    private DatasiftConverter converter;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public DatasiftTypeConverterProcessor(Class outClass) {
-        this.outClass = outClass;
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object doc;
-        try {
-            if( entry.getDocument() instanceof String ) {
-                ObjectNode node = 
this.mapper.readValue((String)entry.getDocument(), ObjectNode.class);
-                doc = this.converter.convert(node, this.mapper);
-            } else {
-                doc = this.converter.convert(entry.getDocument(), this.mapper);
-            }
-            if(doc != null) {
-                result.add(new StreamsDatum(doc, entry.getId()));
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception converting Datasift Interaction to 
"+this.outClass.getName()+ " : {}", e);
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mapper = StreamsDatasiftMapper.getInstance();
-        this.datasiftInteractionActivitySerializer = new 
DatasiftActivitySerializer();
-        if(this.outClass.equals(Activity.class)) {
-            this.converter = new ActivityConverter();
-        } else if (this.outClass.equals(String.class)) {
-            this.converter = new StringConverter();
-        } else {
-            LOGGER.warn("Using defaulting datasift converter");
-            this.converter = new DefaultConverter(this.outClass);
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    private class ActivityConverter implements DatasiftConverter {
-
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            if(toConvert instanceof Activity)
-                return toConvert;
-            try {
-                if(toConvert instanceof String)
-                    return 
datasiftInteractionActivitySerializer.deserialize((String) toConvert);
-                return mapper.convertValue(toConvert, Activity.class);
-            } catch (Exception e) {
-                LOGGER.error("Exception while trying to convert {} to a 
Activity.", toConvert.getClass());
-                LOGGER.error("Exception : {}", e);
-                e.printStackTrace();
-                return null;
-            }
-        }
-
-
-    }
-
-    private class StringConverter implements DatasiftConverter {
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            try {
-                if(toConvert instanceof String){
-                    return mapper.writeValueAsString(mapper.readValue((String) 
toConvert, Datasift.class));
-                } else {
-                    if(toConvert.getClass().equals(Activity.class)) { //hack 
to remove additional properties
-                        ObjectNode node = mapper.convertValue(toConvert, 
ObjectNode.class);
-                        
CleanAdditionalPropertiesProcessor.cleanAdditionalProperties(node);
-                        return mapper.writeValueAsString(node);
-                    } else
-                        return mapper.writeValueAsString(toConvert);
-                }
-            } catch (Exception e) {
-                LOGGER.error("Exception while trying to write {} as a 
String.", toConvert.getClass());
-                LOGGER.error("Exception : {}", e);
-                return null;
-            }
-        }
-    }
-
-    private class DefaultConverter implements DatasiftConverter {
-
-        private Class clazz;
-
-        public DefaultConverter(Class clazz) {
-            this.clazz = clazz;
-        }
-
-        @Override
-        public Object convert(Object toConvert, ObjectMapper mapper) {
-            try {
-                if(toConvert instanceof String) {
-                    return mapper.readValue((String) toConvert, this.clazz);
-                } else {
-                    return mapper.convertValue(toConvert, this.clazz);
-                }
-
-            } catch (Exception e) {
-                throw new RuntimeException("Failed converting +"+ 
toConvert.getClass().getName()+" to "+ this.clazz.getName());
-            }
-        }
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
deleted file mode 100644
index f978205..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Converts a {@link org.apache.streams.datasift.Datasift} object to a 
StreamsDatum
- */
-public interface DatasiftConverter {
-
-    /**
-     * Converts a datasift related object to the desired resulting object.
-     * @param toConvert
-     * @param mapper
-     * @return
-     */
-    public Object convert(Object toConvert, ObjectMapper mapper);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
deleted file mode 100644
index 8200ce2..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.DataSiftClient;
-import com.datasift.client.managedsource.ManagedSource;
-import com.datasift.client.managedsource.ManagedSourceList;
-import com.datasift.client.managedsource.sources.DataSource;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.managed.StreamsManagedSource;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 8/8/14.
- */
-public class DatasiftManagedSourceSetup implements Runnable {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DatasiftStreamProvider.class);
-
-    private static DatasiftConfiguration config = 
DatasiftStreamConfigurator.detectConfiguration(StreamsConfigurator.config);
-
-    private static final ObjectMapper MAPPER = 
StreamsDatasiftMapper.getInstance();
-
-    DataSiftClient client;
-    Map<String, ManagedSource> currentManagedSourceMap = Maps.newHashMap();
-    List<StreamsManagedSource> updatedManagedSourceList;
-
-    public static void main(String[] args) {
-        DatasiftManagedSourceSetup job = new DatasiftManagedSourceSetup();
-        (new Thread(job)).start();
-    }
-
-    @Override
-    public void run() {
-
-        setup();
-
-        current();
-
-        updatedManagedSourceList = config.getManagedSources();
-
-        for( StreamsManagedSource source : updatedManagedSourceList ) {
-            ManagedSource current = currentManagedSourceMap.get( 
source.getId() );
-            LOGGER.info( "CURRENT: " + current );
-            // merge 'em
-            ManagedSource working = MAPPER.convertValue(source, 
ManagedSource.class);
-            LOGGER.info( "WORKING: " + working );
-            ManagedSource updated = 
client.managedSource().update(current.getName(), (DataSource) working, 
current).sync();
-            LOGGER.info( "UPDATED: " + updated );
-
-        }
-
-    }
-
-    public void setup() {
-
-        client = new DatasiftStreamProvider(null, 
config).getNewClient(config.getUserName(), config.getApiKey());
-    }
-
-    public void current() {
-        ManagedSourceList managedSources = client.managedSource().get().sync();
-        Iterator<ManagedSource> managedSourceIterator = 
managedSources.iterator();
-        while( managedSourceIterator.hasNext() ) {
-            ManagedSource source = managedSourceIterator.next();
-            currentManagedSourceMap.put(source.getId(), source);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
deleted file mode 100644
index bdd2c97..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.DatasiftWebhookData;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-/**
- * {@code DatasiftPushProvider} is an implementation of the {@link 
org.apache.streams.core.StreamsProvider} interface, with
- * annotations that allow it to bind as jersey resources within 
streams-runtime-dropwizard.
- *
- * Whereas GenericWebhookResource outputs ObjectNode datums, 
DatasiftPushProvider outputs Datasift datums, with
- * metadata when the json_meta endpoint is used.
- */
-@Resource
-@Path("/streams/webhooks/datasift")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider {
-
-    private final static String STREAMS_ID = "DatasiftPushProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DatasiftPushProvider.class);
-
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", 
Pattern.MULTILINE);
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @POST
-    @Path("json")
-    public Response json(@Context HttpHeaders headers,
-                         String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        StreamsDatum datum = new StreamsDatum(body);
-
-        lock.writeLock().lock();
-        ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        lock.writeLock().unlock();
-
-        Boolean success = true;
-
-        response.put("success", success);
-
-        return Response.status(200).entity(response).build();
-
-    }
-
-    @POST
-    @Path("json_new_line")
-    public Response json_new_line(@Context HttpHeaders headers,
-                                  String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        if (body.equalsIgnoreCase("{}")) {
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-        }
-
-        try {
-
-            for( String item : Splitter.on(newLinePattern).split(body)) {
-                StreamsDatum datum = new StreamsDatum(item);
-
-                lock.writeLock().lock();
-                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-                lock.writeLock().unlock();
-
-            }
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-
-        } catch (Exception e) {
-            LOGGER.warn(e.toString(), e);
-
-            Boolean success = false;
-
-            response.put("success", success);
-
-            return Response.status(500).entity(response).build();
-
-        }
-
-    }
-
-    @POST
-    @Path("json_meta")
-    public Response json_meta(@Context HttpHeaders headers,
-                              String body) {
-
-        //log.debug(headers.toString(), headers);
-
-        //log.debug(body.toString(), body);
-
-        ObjectNode response = mapper.createObjectNode();
-
-        if (body.equalsIgnoreCase("{}")) {
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-        }
-
-        try {
-
-            DatasiftWebhookData objectWrapper = mapper.readValue(body, 
DatasiftWebhookData.class);
-
-            for( Datasift item : objectWrapper.getInteractions()) {
-
-                String json = mapper.writeValueAsString(item);
-
-                StreamsDatum datum = new StreamsDatum(json);
-                if( item.getInteraction() != null &&
-                    !Strings.isNullOrEmpty(item.getInteraction().getId())) {
-                    datum.setId(item.getInteraction().getId());
-                }
-                if( item.getInteraction() != null &&
-                    item.getInteraction().getCreatedAt() != null) {
-                    datum.setTimestamp(item.getInteraction().getCreatedAt());
-                }
-                Map<String, Object> metadata = Maps.newHashMap();
-                metadata.put("hash", objectWrapper.getHash());
-                metadata.put("hashType", objectWrapper.getHashType());
-                metadata.put("id",objectWrapper.getId());
-
-                if( item.getInteraction() != null &&
-                        item.getInteraction().getTags() != null &&
-                        item.getInteraction().getTags().size() > 0) {
-                    metadata.put("tags", item.getInteraction().getTags());
-                }
-
-                datum.setMetadata(metadata);
-
-                lock.writeLock().lock();
-                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-                lock.writeLock().unlock();
-            }
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-
-        } catch (Exception e) {
-            LOGGER.warn(e.toString(), e);
-        }
-
-        return Response.status(500).build();
-    }
-
-    @Override
-    public void startStream() {
-        return;
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        lock.writeLock().lock();
-        current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-        providerQueue.clear();
-        lock.writeLock().unlock();
-
-        return current;
-
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return true;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
deleted file mode 100644
index 6ec395d..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class DatasiftStreamConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DatasiftStreamConfigurator.class);
-
-    private static final ObjectMapper MAPPER = 
StreamsDatasiftMapper.getInstance();
-
-    public static DatasiftConfiguration detectConfiguration(Config datasift) {
-
-        DatasiftConfiguration datasiftConfiguration = null;
-
-        try {
-            datasiftConfiguration = 
MAPPER.readValue(datasift.root().render(ConfigRenderOptions.concise()), 
DatasiftConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse datasiftConfiguration");
-        }
-        return datasiftConfiguration;
-    }
-
-}


Reply via email to