Resolves STREAMS-416
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0b512d8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0b512d8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0b512d8b Branch: refs/heads/master Commit: 0b512d8b6c0d2b36a635d20b9be9a481eb67d6b9 Parents: 2c12724 Author: Elias Ponvert <[email protected]> Authored: Mon Oct 17 10:37:51 2016 -0500 Committer: Elias Ponvert <[email protected]> Committed: Mon Oct 17 10:37:51 2016 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 5 - .../streams-persist-cassandra/pom.xml | 168 ---- .../configuration/CassandraConfiguration.java | 81 -- .../model/CassandraActivityStreamsEntry.java | 45 - .../CassandraActivityStreamsRepository.java | 176 ---- .../repository/impl/CassandraKeyspace.java | 61 -- .../impl/CassandraSubscriptionRepository.java | 69 -- .../spring/streams-cassandra-context.xml | 25 - .../CassandraActivityStreamsRepositoryTest.java | 99 --- .../impl/CassandraActivitySubscriptionTest.java | 54 -- .../streams-provider-datasift/README.md | 18 - .../streams-provider-datasift/pom.xml | 242 ------ .../streams/datasift/csdl/DatasiftCsdlUtil.java | 132 --- .../DatasiftActivitySerializerProcessor.java | 97 --- .../DatasiftTypeConverterProcessor.java | 167 ---- .../datasift/provider/DatasiftConverter.java | 38 - .../provider/DatasiftManagedSourceSetup.java | 94 --- .../datasift/provider/DatasiftPushProvider.java | 270 ------ .../provider/DatasiftStreamConfigurator.java | 52 -- .../provider/DatasiftStreamProvider.java | 242 ------ .../streams/datasift/provider/ErrorHandler.java | 47 -- .../streams/datasift/provider/Subscription.java | 60 -- .../serializer/DatasiftActivitySerializer.java | 65 -- .../serializer/DatasiftEventClassifier.java | 53 -- .../DatasiftInstagramActivitySerializer.java | 125 --- .../DatasiftInteractionActivitySerializer.java | 247 ------ .../DatasiftTwitterActivitySerializer.java | 271 ------ .../datasift/util/StreamsDatasiftMapper.java | 89 -- .../org/apache/streams/datasift/Datasift.json | 462 ----------- .../streams/datasift/DatasiftConfiguration.json | 136 ---- .../datasift/DatasiftPushConfiguration.json | 20 - .../datasift/DatasiftStreamConfiguration.json | 20 - .../streams/datasift/DatasiftWebhookData.json | 35 - .../datasift/facebook/DatasiftFacebook.json | 125 --- .../datasift/instagram/DatasiftInstagram.json | 183 ----- .../interaction/DatasiftInteraction.json | 97 --- .../datasift/twitter/DatasiftTwitter.json | 370 --------- .../datasift/twitter/DatasiftTwitterMedia.json | 132 --- .../datasift/twitter/DatasiftTwitterUser.json | 73 -- .../src/main/resources/datasift.conf | 25 - .../src/site/markdown/index.md | 27 - .../com/datasift/test/DatasiftSerDeTest.java | 75 -- .../provider/DatasiftStreamProviderTest.java | 144 ---- .../datasift/provider/ErrorHandlerTest.java | 40 - .../datasift/provider/SubscriptionTest.java | 58 -- .../DatasiftActivitySerializerIT.java | 110 --- .../serializer/DatasiftEventClassifierTest.java | 71 -- .../DatasiftInstagramActivitySerializerIT.java | 57 -- ...DatasiftInteractionActivitySerializerIT.java | 62 -- .../DatasiftTwitterActivitySerializerIT.java | 57 -- .../gnip-edc-facebook/pom.xml | 152 ---- .../test/FacebookEDCAsActivityTest.java | 93 --- .../facebook/test/FacebookEDCSerDeTest.java | 74 -- .../gnip-edc-flickr/pom.xml | 139 ---- .../flickr/test/FlickrEDCAsActivityTest.java | 92 --- .../gnip/flickr/test/FlickrEDCSerDeTest.java | 76 -- .../gnip-edc-googleplus/pom.xml | 102 --- .../com/gplus/api/GPlusActivitySerializer.java | 93 --- .../com/gplus/api/GPlusEDCAsActivityTest.java | 95 --- .../gnip-edc-instagram/pom.xml | 116 --- .../jsonschema/com/instagram/Instagram.json | 208 ----- .../com/instagram/test/InstagramSerDeTest.java | 70 -- .../gnip-edc-reddit/pom.xml | 103 --- .../reddit/api/RedditActivitySerializer.java | 107 --- .../reddit/api/RedditEDCAsActivityJSONTest.java | 97 --- .../gnip-edc-youtube/pom.xml | 139 ---- .../java/com/gnip/test/YouTubeEDCSerDeTest.java | 79 -- .../com/gnip/test/YoutubeEDCAsActivityTest.java | 87 -- .../gnip-powertrack/README.md | 8 - .../gnip-powertrack/pom.xml | 207 ----- .../ActivityXMLActivitySerializer.java | 240 ------ .../gnip/powertrack/GnipActivityFixer.java | 151 ---- .../PowerTrackActivitySerializer.java | 121 --- .../src/main/jsonschema/com/gnip/Gnip.json | 815 ------------------- .../src/main/xmlschema/com/gnip/binding.xjb | 33 - .../src/main/xmlschema/com/gnip/entry.xsd | 398 --------- .../gnip-powertrack/src/site/markdown/index.md | 15 - .../test/PowerTrackDeserializationTest.java | 55 -- streams-contrib/streams-provider-gnip/pom.xml | 67 -- 79 files changed, 9503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 8db46c1..b50f440 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -37,7 +37,6 @@ </properties> <modules> - <!--<module>streams-persist-cassandra</module>--> <module>streams-persist-console</module> <module>streams-persist-elasticsearch</module> <module>streams-persist-filebuffer</module> @@ -47,16 +46,12 @@ <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> <module>streams-amazon-aws</module> - <!--<module>streams-processor-lucene</module>--> - <!--<module>streams-processor-tika</module>--> <module>streams-processor-jackson</module> <module>streams-processor-json</module> <module>streams-processor-urls</module> <module>streams-processor-peoplepattern</module> - <module>streams-provider-datasift</module> <module>streams-provider-facebook</module> <module>streams-provider-google</module> - <module>streams-provider-gnip</module> <module>streams-provider-instagram</module> <module>streams-provider-moreover</module> <module>streams-provider-twitter</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml deleted file mode 100644 index 578004c..0000000 --- a/streams-contrib/streams-persist-cassandra/pom.xml +++ /dev/null @@ -1,168 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.streams</groupId> - <artifactId>streams-contrib</artifactId> - <version>0.4-incubating-SNAPSHOT</version> - </parent> - - <artifactId>streams-persist-cassandra</artifactId> - <name>${project.artifactId}</name> - - <description>Cassandra Module</description> - - <properties> - <bundle.symbolicName>streams-persist-cassandra</bundle.symbolicName> - <bundle.namespace>org.apache.streams</bundle.namespace> - <easymock.version>3.2</easymock.version> - </properties> - - <packaging>bundle</packaging> - <build> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - - <resource> - <directory>.</directory> - <includes> - <include>plugin.xml</include> - <include>plugin.properties</include> - <include>icons/**</include> - </includes> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.ops4j</groupId> - <artifactId>maven-pax-plugin</artifactId> - <!-- - | enable improved OSGi compilation support for the bundle life-cycle. - | to switch back to the standard bundle life-cycle, move this setting - | down to the maven-bundle-plugin section - --> - <extensions>true</extensions> - </plugin> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>1.4.3</version> - <!-- - | the following instructions build a simple set of public/private classes into an OSGi bundle - --> - <configuration> - <instructions> - <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName> - <Bundle-Version>${project.version}</Bundle-Version> - <Export-Package> - ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration - </Export-Package> - <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package> - <Import-Package> - org.apache.rave.model,org.apache.rave.portal.model.impl, - com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate, - javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map, - org.apache.commons.lang, - org.apache.streams.osgi.components.activitysubscriber, - org.springframework.beans.factory.annotation, org.springframework.stereotype - </Import-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.rave</groupId> - <artifactId>rave-core-api</artifactId> - <version>${rave.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.rave</groupId> - <artifactId>rave-core</artifactId> - <version>${rave.version}</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.8.2</version> - </dependency> - - <dependency> - <groupId>com.datastax.cassandra</groupId> - <artifactId>cassandra-driver-core</artifactId> - <version>${datastax.version}</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - <version>3.2.9.Final</version> - </dependency> - <dependency> - <groupId>org.apache.streams.osgi.components</groupId> - <artifactId>activity-subscriber</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <version>${easymock.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-testing</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java deleted file mode 100644 index 195467d..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.configuration; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -public class CassandraConfiguration { - @Value("${keyspaceName}") - private String keyspaceName; - - @Value("${activitystreamsColumnFamilyName}") - private String activitystreamsColumnFamilyName; - - @Value("${subscriptionColumnFamilyName}") - private String subscriptionColumnFamilyName; - - @Value("${publisherColumnFamilyName}") - private String publisherColumnFamilyName; - - @Value("${cassandraPort}") - private String cassandraPort; - - public String getKeyspaceName() { - return keyspaceName; - } - - public void setKeyspaceName(String keyspaceName) { - this.keyspaceName = keyspaceName; - } - - public String getActivitystreamsColumnFamilyName() { - return activitystreamsColumnFamilyName; - } - - public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) { - this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName; - } - - public String getSubscriptionColumnFamilyName() { - return subscriptionColumnFamilyName; - } - - public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) { - this.subscriptionColumnFamilyName = subscriptionColumnFamilyName; - } - - public String getPublisherColumnFamilyName() { - return publisherColumnFamilyName; - } - - public void setPublisherColumnFamilyName(String publisherColumnFamilyName) { - this.publisherColumnFamilyName = publisherColumnFamilyName; - } - - public String getCassandraPort() { - return cassandraPort; - } - - public void setCassandraPort(String cassandraPort) { - this.cassandraPort = cassandraPort; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java deleted file mode 100644 index 88db8aa..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.model; - -import org.apache.rave.model.ActivityStreamsObject; -import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl; -import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl; -import org.codehaus.jackson.map.annotate.JsonDeserialize; - -import java.util.Date; - -public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl implements Comparable<CassandraActivityStreamsEntry>{ - - @JsonDeserialize(as=ActivityStreamsObjectImpl.class) - private ActivityStreamsObject object; - - @JsonDeserialize(as=ActivityStreamsObjectImpl.class) - private ActivityStreamsObject target; - - @JsonDeserialize(as=ActivityStreamsObjectImpl.class) - private ActivityStreamsObject actor; - - @JsonDeserialize(as=ActivityStreamsObjectImpl.class) - private ActivityStreamsObject provider; - - public int compareTo(CassandraActivityStreamsEntry entry){ - return (this.getPublished()).compareTo(entry.getPublished()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java deleted file mode 100644 index 56e5416..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.repository.impl; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.AlreadyExistsException; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.rave.model.ActivityStreamsEntry; -import org.apache.rave.model.ActivityStreamsObject; -import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl; -import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl; -import org.apache.streams.cassandra.configuration.CassandraConfiguration; -import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - - -public class CassandraActivityStreamsRepository { - - private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class); - - private CassandraKeyspace keyspace; - private CassandraConfiguration configuration; - - @Autowired - public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) { - this.configuration = configuration; - this.keyspace = keyspace; - - try { - keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" + - "id text, " + - "published timestamp, " + - "verb text, " + - "tags text, " + - - "actor_displayname text, " + - "actor_id text, " + - "actor_url text, " + - "actor_objecttype text, " + - - "target_displayname text, " + - "target_id text, " + - "target_url text, " + - - "provider_url text, " + - - "object_url text, " + - "object_displayname text, " + - "object_id text, " + - "object_objecttype text, " + - - "PRIMARY KEY (id, tags, published));"); - } catch (AlreadyExistsException ignored) { - } - } - - public void save(ActivityStreamsEntry entry) { - String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" + - "id, published, verb, tags, " + - "actor_displayname, actor_objecttype, actor_id, actor_url, " + - "target_displayname, target_id, target_url, " + - "provider_url, " + - "object_displayname, object_objecttype, object_id, object_url) " + - "VALUES ('" + - entry.getId() + "','" + - entry.getPublished().getTime() + "','" + - entry.getVerb() + "','" + - entry.getTags() + "','" + - - entry.getActor().getDisplayName() + "','" + - entry.getActor().getObjectType() + "','" + - entry.getActor().getId() + "','" + - entry.getActor().getUrl() + "','" + - - entry.getTarget().getDisplayName() + "','" + - entry.getTarget().getId() + "','" + - entry.getTarget().getUrl() + "','" + - - entry.getProvider().getUrl() + "','" + - - entry.getObject().getDisplayName() + "','" + - entry.getObject().getObjectType() + "','" + - entry.getObject().getId() + "','" + - entry.getObject().getUrl() + - - "')"; - keyspace.getSession().execute(sql); - } - - public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) { - List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>(); - - for (String tag : filters) { - String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE "; - - //add filters - cql = cql + " tags = '" + tag + "' AND "; - - //specify last modified - cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING"; - - //execute the cql query and store the results - ResultSet set = keyspace.getSession().execute(cql); - - //iterate through the results and create a new ActivityStreamsEntry for every result returned - - for (Row row : set) { - CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry(); - ActivityStreamsObject actor = new ActivityStreamsObjectImpl(); - ActivityStreamsObject target = new ActivityStreamsObjectImpl(); - ActivityStreamsObject object = new ActivityStreamsObjectImpl(); - ActivityStreamsObject provider = new ActivityStreamsObjectImpl(); - - actor.setDisplayName(row.getString("actor_displayname")); - actor.setId(row.getString("actor_id")); - actor.setObjectType(row.getString("actor_objecttype")); - actor.setUrl(row.getString("actor_url")); - - target.setDisplayName(row.getString("target_displayname")); - target.setId(row.getString("target_id")); - target.setUrl(row.getString("target_url")); - - object.setDisplayName(row.getString("object_displayname")); - object.setObjectType(row.getString("object_objecttype")); - object.setUrl(row.getString("object_url")); - object.setId(row.getString("object_id")); - - provider.setUrl(row.getString("provider_url")); - - entry.setPublished(row.getDate("published")); - entry.setVerb(row.getString("verb")); - entry.setId(row.getString("id")); - entry.setTags(row.getString("tags")); - entry.setActor(actor); - entry.setTarget(target); - entry.setObject(object); - entry.setProvider(provider); - - results.add(entry); - } - } - - return results; - } - - public void dropTable(String table) { - String cql = "DROP TABLE " + table; - keyspace.getSession().execute(cql); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java deleted file mode 100644 index 9106a51..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.repository.impl; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.AlreadyExistsException; -import org.apache.streams.cassandra.configuration.CassandraConfiguration; -import org.springframework.beans.factory.annotation.Autowired; - -public class CassandraKeyspace { - private CassandraConfiguration configuration; - private Cluster cluster; - private Session session; - - @Autowired - public CassandraKeyspace(CassandraConfiguration configuration){ - this.configuration = configuration; - - cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build(); - session = cluster.connect(); - - //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS - try { - session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };"); - } catch (AlreadyExistsException ignored) { - } - - //connect to the keyspace - session = cluster.connect(configuration.getKeyspaceName()); - } - - public Session getSession(){ - return session; - } - - @Override - protected void finalize() throws Throwable { - try { - cluster.shutdown(); - } finally { - super.finalize(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java deleted file mode 100644 index f5fe471..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.repository.impl; - -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.exceptions.AlreadyExistsException; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.cassandra.configuration.CassandraConfiguration; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; -import org.springframework.beans.factory.annotation.Autowired; - -public class CassandraSubscriptionRepository { - private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class); - - private CassandraKeyspace keyspace; - private CassandraConfiguration configuration; - - @Autowired - public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) { - this.keyspace = keyspace; - this.configuration = configuration; - - try { - keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" + - "id text, " + - "filters text, " + - - "PRIMARY KEY (id));"); - } catch (AlreadyExistsException ignored) { - } - } - - public String getFilters(String id){ - String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName() + " WHERE id = '" + id+"';"; - - ResultSet set = keyspace.getSession().execute(cql); - - return set.one().getString("filters"); - } - - public void save(ActivityStreamsSubscription subscription){ - String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" + - "id, filters) " + - "VALUES ('" + - subscription.getAuthToken() + "','" + - StringUtils.join(subscription.getFilters(), " ") + - - "')"; - keyspace.getSession().execute(cql); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml deleted file mode 100644 index 842c918..0000000 --- a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml +++ /dev/null @@ -1,25 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> -<beans - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns="http://www.springframework.org/schema/beans" - xmlns:context="http://www.springframework.org/schema/context" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> - -</beans> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java deleted file mode 100644 index 978af10..0000000 --- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.repository.impl; - -import com.datastax.driver.core.ResultSet; -import org.apache.rave.model.ActivityStreamsEntry; -import org.apache.rave.model.ActivityStreamsObject; -import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl; -import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl; -import org.apache.streams.cassandra.configuration.CassandraConfiguration; -import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry; -import static org.easymock.EasyMock.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Date; -import java.util.List; - -public class CassandraActivityStreamsRepositoryTest { - - private CassandraActivityStreamsRepository repository; - - - @Before - public void setup() { - CassandraKeyspace keyspace = createMock(CassandraKeyspace.class); - CassandraConfiguration configuration = createMock(CassandraConfiguration.class); - repository = new CassandraActivityStreamsRepository(keyspace, configuration); - } - - @Ignore - @Test - public void saveActivity() { - ActivityStreamsEntry entry = new ActivityStreamsEntryImpl(); - ActivityStreamsObject actor = new ActivityStreamsObjectImpl(); - ActivityStreamsObject target = new ActivityStreamsObjectImpl(); - ActivityStreamsObject object = new ActivityStreamsObjectImpl(); - ActivityStreamsObject provider = new ActivityStreamsObjectImpl(); - - actor.setId("actorid1"); - actor.setUrl("actorurl1"); - actor.setDisplayName("actorname1"); - - target.setId("targetid1"); - target.setUrl("targeturl1"); - target.setDisplayName("r501"); - - provider.setUrl("providerurl"); - - object.setId("objectid1"); - object.setDisplayName("objectname1"); - - entry.setId("dink"); - entry.setVerb("verb1"); - entry.setTags("r501"); - entry.setProvider(provider); - Date d = new Date(); - entry.setPublished(d); - entry.setActor(actor); - entry.setObject(object); - entry.setTarget(target); - - repository.save(entry); - } - - @Ignore - @Test - public void getActivity() { - String cql = "tags"; - String other = "r501"; - List<String> f = Arrays.asList(cql, other); - Date d = new Date(0); - List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d); - } - - @Ignore - @Test - public void dropTableTest(){ - repository.dropTable("coltest"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java deleted file mode 100644 index 2a90462..0000000 --- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.cassandra.repository.impl; - -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; -import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Arrays; - -public class CassandraActivitySubscriptionTest { - - public CassandraSubscriptionRepository repository; - - - @Before - public void setup() { -// repository = new CassandraSubscriptionRepository(); - } - - @Ignore - @Test - public void saveTest(){ - ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl(); - subscription.setFilters(Arrays.asList("thisis", "atest")); - subscription.setAuthToken("subid"); - - repository.save(subscription); - } - - @Ignore - @Test - public void getTest(){ - String filters = repository.getFilters("subid"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/README.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/README.md b/streams-contrib/streams-provider-datasift/README.md deleted file mode 100644 index a29fb16..0000000 --- a/streams-contrib/streams-provider-datasift/README.md +++ /dev/null @@ -1,18 +0,0 @@ -streams-provider-datasift -===================== - -Datasift Provider - -Example configuration: - - datasift { - apiKey = "" - userName = "" - hashes = [ - "b8aaf7cec5faa2fadbd55d651933a31e", - "f41f054e2a2ba8d2e7b0d74f56e727d6" - ] - } - - - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml deleted file mode 100644 index def7826..0000000 --- a/streams-contrib/streams-provider-datasift/pom.xml +++ /dev/null @@ -1,242 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.streams</groupId> - <artifactId>streams-contrib</artifactId> - <version>0.4-incubating-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>streams-provider-datasift</artifactId> - <name>${project.artifactId}</name> - - <description>Datasift Provider</description> - - <properties> - <skipITs>true</skipITs> - <testDataBaseURl>http://streams.peoplepattern.com.s3.amazonaws.com/test-data/</testDataBaseURl> - </properties> - - - <dependencies> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-config</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-pojo</artifactId> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-processor-jackson</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-provider-twitter</artifactId> - <version>${project.version}</version> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-provider-instagram</artifactId> - <version>${project.version}</version> - <optional>true</optional> - </dependency> - <dependency> - <groupId>com.datasift.client</groupId> - <artifactId>datasift-java</artifactId> - <version>3.2.6</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j</artifactId> - <groupId>log4j</groupId> - </exclusion> - <exclusion> - <artifactId>commons-logging</artifactId> - <groupId>commons-logging</groupId> - </exclusion> - <exclusion> - <groupId>com.boundary</groupId> - <artifactId>high-scale-lib</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.boundary</groupId> - <artifactId>high-scale-lib</artifactId> - <version>1.0.6</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - <dependency> - <groupId>org.jsonschema2pojo</groupId> - <artifactId>jsonschema2pojo-core</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.9.5</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-util</artifactId> - <type>test-jar</type> - <scope>test</scope> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-testing</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <build> - <sourceDirectory>src/main/java</sourceDirectory> - <testSourceDirectory>src/test/java17</testSourceDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> - <testResources> - <testResource> - <directory>src/test/resources</directory> - </testResource> - </testResources> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/jsonschema2pojo</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-test-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/java17</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.jsonschema2pojo</groupId> - <artifactId>jsonschema2pojo-maven-plugin</artifactId> - <configuration> - <addCompileSourceRoot>true</addCompileSourceRoot> - <generateBuilders>true</generateBuilders> - <sourceDirectory>${project.basedir}/src/main/jsonschema</sourceDirectory> - <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> - <targetPackage>org.apache.streams.datasift</targetPackage> - <useLongIntegers>true</useLongIntegers> - <useJodaDates>true</useJodaDates> - </configuration> - <executions> - <execution> - <goals> - <goal>generate</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>com.googlecode.maven-download-plugin</groupId> - <artifactId>download-maven-plugin</artifactId> - <version>1.2.1</version> - <executions> - <execution> - <id>download-it-data</id> - <phase>pre-integration-test</phase> - <goals> - <goal>wget</goal> - </goals> - <configuration> - <url>${testDataBaseURl}/${project.artifactId}.zip</url> - <unpack>true</unpack> - <outputDirectory>${project.build.directory}/test-classes</outputDirectory> - <!--<md5>df65b5642f33676313ebe4d5b69a3fff</md5>--> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <configuration> - <skipTests>${skipITs}</skipTests> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - - </plugins> - - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java deleted file mode 100644 index 049ed8c..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.datasift.csdl; - -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.ListIterator; - -public class DatasiftCsdlUtil { - - private static final Logger log = LoggerFactory - .getLogger(DatasiftCsdlUtil.class); - - public static String csdlFromTwitterUserIds(List<String> list) throws Exception { - - StringBuilder csdlBuilder = new StringBuilder(); - - csdlBuilder.append("twitter.user.id in ["); - ListIterator<String> listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("]\n"); - csdlBuilder.append(" OR\n"); - csdlBuilder.append("twitter.in_reply_to_user_id contains_any \""); - listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - csdlBuilder.append(" OR\n"); - csdlBuilder.append("twitter.mention_ids in ["); - listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("]\n"); - - log.debug(csdlBuilder.toString()); - - return csdlBuilder.toString(); - } - - public static String csdlFromTwitterUserNames(List<String> list) throws Exception { - - StringBuilder csdlBuilder = new StringBuilder(); - - csdlBuilder.append("twitter.user.screen_name contains_any \""); - ListIterator<String> listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - csdlBuilder.append(" OR\n"); - csdlBuilder.append("twitter.in_reply_to_screen_name contains_any \""); - listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - csdlBuilder.append(" OR\n"); - csdlBuilder.append("twitter.mentions contains_any \""); - listIterator = Lists.newArrayList(list).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - - log.debug(csdlBuilder.toString()); - - return csdlBuilder.toString(); - } - - public static String csdlFromKeywords(List<String> include, List<String> exclude) throws Exception { - - StringBuilder csdlBuilder = new StringBuilder(); - - csdlBuilder.append("interaction.content contains_any \""); - ListIterator<String> listIterator = Lists.newArrayList(include).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - csdlBuilder.append(" AND NOT ( \n"); - csdlBuilder.append("interaction.content \""); - listIterator = Lists.newArrayList(exclude).listIterator(); - while( listIterator.hasNext() ) { - csdlBuilder.append(listIterator.next()); - if (listIterator.hasNext()) - csdlBuilder.append(","); - } - csdlBuilder.append("\"\n"); - csdlBuilder.append(")\n"); - - log.debug(csdlBuilder.toString()); - - return csdlBuilder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java deleted file mode 100644 index 43b16b2..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java +++ /dev/null @@ -1,97 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.streams.datasift.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.provider.DatasiftConverter; -import org.apache.streams.datasift.serializer.DatasiftActivitySerializer; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * - */ -public class DatasiftActivitySerializerProcessor implements StreamsProcessor { - - private final static String STREAMS_ID = "DatasiftActivitySerializerProcessor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftActivitySerializerProcessor.class); - - private ObjectMapper mapper; - private Class outClass; - private DatasiftActivitySerializer datasiftActivitySerializer; - - public final static String TERMINATE = new String("TERMINATE"); - - public DatasiftActivitySerializerProcessor(Class outClass) { - this.outClass = outClass; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newLinkedList(); - Activity activity; - try { - Datasift node; - if( entry.getDocument() instanceof String ) { - node = this.mapper.readValue((String)entry.getDocument(), Datasift.class); - } else if( entry.getDocument() instanceof Datasift ) { - node = (Datasift) entry.getDocument(); - } else { - node = this.mapper.convertValue(entry.getDocument(), Datasift.class); - } - if(node != null) { - activity = this.datasiftActivitySerializer.deserialize(node); - StreamsDatum datum = new StreamsDatum(activity, entry.getId(), entry.getTimestamp(), entry.getSequenceid()); - datum.setMetadata(entry.getMetadata()); - result.add(datum); - } - } catch (Exception e) { - LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e); - } - return result; - } - - @Override - public void prepare(Object configurationObject) { - this.mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT)); - this.datasiftActivitySerializer = new DatasiftActivitySerializer(); - } - - @Override - public void cleanUp() { - - } - -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java deleted file mode 100644 index dcffd1a..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java +++ /dev/null @@ -1,167 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.streams.datasift.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.provider.DatasiftConverter; -import org.apache.streams.datasift.serializer.DatasiftActivitySerializer; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.jackson.CleanAdditionalPropertiesProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * - */ -public class DatasiftTypeConverterProcessor implements StreamsProcessor { - - private final static String STREAMS_ID = "RegexUrlExtractor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class); - - private ObjectMapper mapper; - private Class outClass; - private DatasiftActivitySerializer datasiftInteractionActivitySerializer; - private DatasiftConverter converter; - - public final static String TERMINATE = new String("TERMINATE"); - - public DatasiftTypeConverterProcessor(Class outClass) { - this.outClass = outClass; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newLinkedList(); - Object doc; - try { - if( entry.getDocument() instanceof String ) { - ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class); - doc = this.converter.convert(node, this.mapper); - } else { - doc = this.converter.convert(entry.getDocument(), this.mapper); - } - if(doc != null) { - result.add(new StreamsDatum(doc, entry.getId())); - } - } catch (Exception e) { - LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e); - } - return result; - } - - @Override - public void prepare(Object configurationObject) { - this.mapper = StreamsDatasiftMapper.getInstance(); - this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer(); - if(this.outClass.equals(Activity.class)) { - this.converter = new ActivityConverter(); - } else if (this.outClass.equals(String.class)) { - this.converter = new StringConverter(); - } else { - LOGGER.warn("Using defaulting datasift converter"); - this.converter = new DefaultConverter(this.outClass); - } - } - - @Override - public void cleanUp() { - - } - - private class ActivityConverter implements DatasiftConverter { - - @Override - public Object convert(Object toConvert, ObjectMapper mapper) { - if(toConvert instanceof Activity) - return toConvert; - try { - if(toConvert instanceof String) - return datasiftInteractionActivitySerializer.deserialize((String) toConvert); - return mapper.convertValue(toConvert, Activity.class); - } catch (Exception e) { - LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass()); - LOGGER.error("Exception : {}", e); - e.printStackTrace(); - return null; - } - } - - - } - - private class StringConverter implements DatasiftConverter { - @Override - public Object convert(Object toConvert, ObjectMapper mapper) { - try { - if(toConvert instanceof String){ - return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class)); - } else { - if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties - ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class); - CleanAdditionalPropertiesProcessor.cleanAdditionalProperties(node); - return mapper.writeValueAsString(node); - } else - return mapper.writeValueAsString(toConvert); - } - } catch (Exception e) { - LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass()); - LOGGER.error("Exception : {}", e); - return null; - } - } - } - - private class DefaultConverter implements DatasiftConverter { - - private Class clazz; - - public DefaultConverter(Class clazz) { - this.clazz = clazz; - } - - @Override - public Object convert(Object toConvert, ObjectMapper mapper) { - try { - if(toConvert instanceof String) { - return mapper.readValue((String) toConvert, this.clazz); - } else { - return mapper.convertValue(toConvert, this.clazz); - } - - } catch (Exception e) { - throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName()); - } - } - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java deleted file mode 100644 index f978205..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.streams.datasift.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * Converts a {@link org.apache.streams.datasift.Datasift} object to a StreamsDatum - */ -public interface DatasiftConverter { - - /** - * Converts a datasift related object to the desired resulting object. - * @param toConvert - * @param mapper - * @return - */ - public Object convert(Object toConvert, ObjectMapper mapper); - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java deleted file mode 100644 index 8200ce2..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.datasift.provider; - -import com.datasift.client.DataSiftClient; -import com.datasift.client.managedsource.ManagedSource; -import com.datasift.client.managedsource.ManagedSourceList; -import com.datasift.client.managedsource.sources.DataSource; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.datasift.DatasiftConfiguration; -import org.apache.streams.datasift.managed.StreamsManagedSource; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Created by sblackmon on 8/8/14. - */ -public class DatasiftManagedSourceSetup implements Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class); - - private static DatasiftConfiguration config = DatasiftStreamConfigurator.detectConfiguration(StreamsConfigurator.config); - - private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); - - DataSiftClient client; - Map<String, ManagedSource> currentManagedSourceMap = Maps.newHashMap(); - List<StreamsManagedSource> updatedManagedSourceList; - - public static void main(String[] args) { - DatasiftManagedSourceSetup job = new DatasiftManagedSourceSetup(); - (new Thread(job)).start(); - } - - @Override - public void run() { - - setup(); - - current(); - - updatedManagedSourceList = config.getManagedSources(); - - for( StreamsManagedSource source : updatedManagedSourceList ) { - ManagedSource current = currentManagedSourceMap.get( source.getId() ); - LOGGER.info( "CURRENT: " + current ); - // merge 'em - ManagedSource working = MAPPER.convertValue(source, ManagedSource.class); - LOGGER.info( "WORKING: " + working ); - ManagedSource updated = client.managedSource().update(current.getName(), (DataSource) working, current).sync(); - LOGGER.info( "UPDATED: " + updated ); - - } - - } - - public void setup() { - - client = new DatasiftStreamProvider(null, config).getNewClient(config.getUserName(), config.getApiKey()); - } - - public void current() { - ManagedSourceList managedSources = client.managedSource().get().sync(); - Iterator<ManagedSource> managedSourceIterator = managedSources.iterator(); - while( managedSourceIterator.hasNext() ) { - ManagedSource source = managedSourceIterator.next(); - currentManagedSourceMap.put(source.getId(), source); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java deleted file mode 100644 index bdd2c97..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java +++ /dev/null @@ -1,270 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.streams.datasift.provider; - -import com.datasift.client.stream.DeletedInteraction; -import com.datasift.client.stream.StreamEventListener; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.DatasiftConfiguration; -import org.apache.streams.datasift.DatasiftWebhookData; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Resource; -import javax.ws.rs.Consumes; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.math.BigInteger; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -/** - * {@code DatasiftPushProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface, with - * annotations that allow it to bind as jersey resources within streams-runtime-dropwizard. - * - * Whereas GenericWebhookResource outputs ObjectNode datums, DatasiftPushProvider outputs Datasift datums, with - * metadata when the json_meta endpoint is used. - */ -@Resource -@Path("/streams/webhooks/datasift") -@Produces(MediaType.APPLICATION_JSON) -@Consumes(MediaType.APPLICATION_JSON) -public class DatasiftPushProvider implements StreamsProvider { - - private final static String STREAMS_ID = "DatasiftPushProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class); - - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE); - - @Override - public String getId() { - return STREAMS_ID; - } - - @POST - @Path("json") - public Response json(@Context HttpHeaders headers, - String body) { - - ObjectNode response = mapper.createObjectNode(); - - StreamsDatum datum = new StreamsDatum(body); - - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); - - Boolean success = true; - - response.put("success", success); - - return Response.status(200).entity(response).build(); - - } - - @POST - @Path("json_new_line") - public Response json_new_line(@Context HttpHeaders headers, - String body) { - - ObjectNode response = mapper.createObjectNode(); - - if (body.equalsIgnoreCase("{}")) { - - Boolean success = true; - - response.put("success", success); - - return Response.status(200).entity(response).build(); - } - - try { - - for( String item : Splitter.on(newLinePattern).split(body)) { - StreamsDatum datum = new StreamsDatum(item); - - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); - - } - - Boolean success = true; - - response.put("success", success); - - return Response.status(200).entity(response).build(); - - } catch (Exception e) { - LOGGER.warn(e.toString(), e); - - Boolean success = false; - - response.put("success", success); - - return Response.status(500).entity(response).build(); - - } - - } - - @POST - @Path("json_meta") - public Response json_meta(@Context HttpHeaders headers, - String body) { - - //log.debug(headers.toString(), headers); - - //log.debug(body.toString(), body); - - ObjectNode response = mapper.createObjectNode(); - - if (body.equalsIgnoreCase("{}")) { - - Boolean success = true; - - response.put("success", success); - - return Response.status(200).entity(response).build(); - } - - try { - - DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class); - - for( Datasift item : objectWrapper.getInteractions()) { - - String json = mapper.writeValueAsString(item); - - StreamsDatum datum = new StreamsDatum(json); - if( item.getInteraction() != null && - !Strings.isNullOrEmpty(item.getInteraction().getId())) { - datum.setId(item.getInteraction().getId()); - } - if( item.getInteraction() != null && - item.getInteraction().getCreatedAt() != null) { - datum.setTimestamp(item.getInteraction().getCreatedAt()); - } - Map<String, Object> metadata = Maps.newHashMap(); - metadata.put("hash", objectWrapper.getHash()); - metadata.put("hashType", objectWrapper.getHashType()); - metadata.put("id",objectWrapper.getId()); - - if( item.getInteraction() != null && - item.getInteraction().getTags() != null && - item.getInteraction().getTags().size() > 0) { - metadata.put("tags", item.getInteraction().getTags()); - } - - datum.setMetadata(metadata); - - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); - } - - Boolean success = true; - - response.put("success", success); - - return Response.status(200).entity(response).build(); - - } catch (Exception e) { - LOGGER.warn(e.toString(), e); - } - - return Response.status(500).build(); - } - - @Override - public void startStream() { - return; - } - - @Override - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - lock.writeLock().lock(); - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - providerQueue.clear(); - lock.writeLock().unlock(); - - return current; - - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return true; - } - - @Override - public void prepare(Object configurationObject) { - - } - - @Override - public void cleanUp() { - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java deleted file mode 100644 index 6ec395d..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.streams.datasift.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.apache.streams.datasift.DatasiftConfiguration; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class DatasiftStreamConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamConfigurator.class); - - private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); - - public static DatasiftConfiguration detectConfiguration(Config datasift) { - - DatasiftConfiguration datasiftConfiguration = null; - - try { - datasiftConfiguration = MAPPER.readValue(datasift.root().render(ConfigRenderOptions.concise()), DatasiftConfiguration.class); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Could not parse datasiftConfiguration"); - } - return datasiftConfiguration; - } - -}
