committing PeoplePattern processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a575322 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a575322 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a575322 Branch: refs/heads/STREAMS-49 Commit: 9a575322231e4a8a69c56f06da743ffe3211ccb4 Parents: d62061d Author: Steve Blackmon <[email protected]> Authored: Sun Oct 12 20:20:52 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Sun Oct 12 20:20:52 2014 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 5 +- .../streams-processor-peoplepattern/pom.xml | 138 +++++++++++++++++++ .../peoplepattern/AccountTypeProcessor.java | 76 ++++++++++ .../peoplepattern/DemographicsProcessor.java | 77 +++++++++++ .../streams/peoplepattern/AccountType.json | 27 ++++ .../streams/peoplepattern/Demographics.json | 60 ++++++++ .../resources/templates/peoplepatternactor.json | 25 ++++ 7 files changed, 406 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index e290466..fcec297 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,17 +44,18 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> - <module>streams-amazon-aws</module> + <module>streams-amazon-aws</module> <!--<module>streams-processor-lucene</module>--> <!--<module>streams-processor-tika</module>--> - <module>streams-provider-instagram</module> <module>streams-processor-jackson</module> <module>streams-processor-json</module> <module>streams-processor-urls</module> + <module>streams-processor-peoplepattern</module> <module>streams-provider-datasift</module> <module>streams-provider-facebook</module> <module>streams-provider-google</module> <module>streams-provider-gnip</module> + <module>streams-provider-instagram</module> <module>streams-provider-moreover</module> <module>streams-provider-twitter</module> <module>streams-provider-sysomos</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml new file mode 100644 index 0000000..b810200 --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/pom.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <artifactId>streams-processor-peoplepattern</artifactId> + <version>0.1-SNAPSHOT</version> + + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-contrib</artifactId> + <version>0.1-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-http</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo/**/*.java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-source-jaxb2</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jaxb2</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.peoplepattern</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java new file mode 100644 index 0000000..d180b7f --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.peoplepattern; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Maps; +import org.apache.streams.components.http.HttpConfigurator; +import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.data.util.ExtensionUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Actor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Enrich actor with demographics + */ +public class AccountTypeProcessor extends SimpleHTTPGetProcessor { + + private final static String STREAMS_ID = "AccountTypeProcessor"; + + private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class); + + public AccountTypeProcessor() { + this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern"))); + } + + public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { + super(peoplePatternConfiguration); + LOGGER.info("creating AccountTypeProcessor"); + configuration.setProtocol("https"); + configuration.setHostname("api.peoplepattern.com"); + configuration.setResourcePath("/v0.2/account_type/"); + configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); + configuration.setExtension("account_type"); + } + + /** + Override this to add parameters to the request + */ + @Override + protected Map<String, String> prepareParams(StreamsDatum entry) { + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class); + Actor actor = activity.getActor(); + ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class); + String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName"); + Map<String, String> params = Maps.newHashMap(); + params.put("id", actor.getId()); + params.put("name", actor.getDisplayName()); + params.put("username", username); + params.put("description", actor.getSummary()); + return params; + } +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java new file mode 100644 index 0000000..6ffbb9b --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.peoplepattern; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Maps; +import org.apache.streams.components.http.HttpConfigurator; +import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.data.util.ExtensionUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Actor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Enrich actor with demographics + */ +public class DemographicsProcessor extends SimpleHTTPGetProcessor { + + public final static String STREAMS_ID = "DemographicsProcessor"; + + private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class); + + public DemographicsProcessor() { + this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern"))); + } + + public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { + super(peoplePatternConfiguration); + LOGGER.info("creating DemographicsProcessor"); + configuration.setProtocol("https"); + configuration.setHostname("api.peoplepattern.com"); + configuration.setResourcePath("/v0.2/demographics/"); + configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR); + configuration.setExtension("demographics"); + } + + /** + Override this to add parameters to the request + */ + @Override + protected Map<String, String> prepareParams(StreamsDatum entry) { + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class); + Actor actor = activity.getActor(); + ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class); + String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName"); + Map<String, String> params = Maps.newHashMap(); + params.put("id", actor.getId()); + params.put("name", actor.getDisplayName()); + params.put("username", username); + params.put("description", actor.getSummary()); + return params; + } + +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json new file mode 100644 index 0000000..5656b44 --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json @@ -0,0 +1,27 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType": "org.apache.streams.peoplepattern.AccountType", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "prediction" : { + "type" : "string", + "enum" : [ + "person", + "organization", + "entertainment", + "adult", + "spam", + "no-prediction" + ], + "default": "no-prediction" + }, + "score": { + "type": "number" + }, + "id": { + "type": "string" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json new file mode 100644 index 0000000..d1f64d8 --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json @@ -0,0 +1,60 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType": "org.apache.streams.peoplepattern.Demographics", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "age": { + "type": "object", + "properties": { + "prediction": { + "type": "integer", + "default": 1990 + }, + "score": { + "type": "number" + } + + } + }, + "gender" : { + "type": "object", + "properties": { + "prediction": { + "type": "string", + "enum": [ + "male", + "female", + "no-prediction" + ], + "default": "no-prediction" + }, + "score": { + "type": "number" + } + } + }, + "race" : { + "type": "object", + "properties": { + "prediction": { + "type": "string", + "enum": [ + "black", + "east-asian", + "hispanic", + "middle-eastern", + "south-asian", + "white", + "no-prediction" + ], + "default": "no-prediction" + }, + "score": { + "type": "number" + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json new file mode 100644 index 0000000..9a24c5c --- /dev/null +++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json @@ -0,0 +1,25 @@ +{ + "order": 20, + "template": "*activity*", + "settings": {}, + "mappings": { + "activity": { + "properties": { + "actor": { + "properties": { + "extensions": { + "properties": { + "account_type": { + "type": "nested" + }, + "demographics": { + "type": "nested" + } + } + } + } + } + } + } + } +} \ No newline at end of file
