This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
The following commit(s) were added to refs/heads/master by this push: new 8328cb3 STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact new 4c024f8 Merge pull request #485 from steveblackmon/STREAMS-647 8328cb3 is described below commit 8328cb3ba5f24ea0696e782cc76b111d65da9907 Author: Steve Blackmon <sblack...@apache.org> AuthorDate: Tue Jul 16 12:00:44 2019 -0500 STREAMS-647 AS 2.0 RDF serialization in streams-processor-fullcontact https://issues.apache.org/jira/browse/STREAMS-647 --- .../src/main/resources/fullcontact.ttl | 107 ++++ .../fullcontact/FullContactSocialGraph.scala | 164 +++++++ .../fullcontact/PersonEnrichmentProcessor.scala | 175 +++++++ .../fullcontact/util/AffinityOrdering.scala | 38 ++ .../fullcontact/util/FullContactUtils.scala | 537 +++++++++++++++++++++ 5 files changed, 1021 insertions(+) diff --git a/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl new file mode 100644 index 0000000..7275e00 --- /dev/null +++ b/streams-contrib/streams-processor-fullcontact/src/main/resources/fullcontact.ttl @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +PREFIX : <http://streams.apache.org/fullcontact#> +PREFIX as: <http://www.w3.org/ns/activitystreams#> +PREFIX dc: <http://purl.org/dc/elements/1.1/#> +PREFIX dct: <http://purl.org/dc/terms/#> +PREFIX owl: <http://www.w3.org/2002/07/owl#> +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX vcard: <http://www.w3.org/2006/vcard/ns#> +PREFIX xs: <http://www.w3.org/2001/XMLSchema#> +BASE <http://streams.apache.org/fullcontact#> + +:PersonSummary a owl:Thing + rdfs:comment "PersonSummary"@en ; + rdfs:label "PersonSummary"@en . + +:ageRange a owl:DatatypeProperty ; + rdfs:label "ageRange"@en ; + rdfs:comment "Age range of the contact."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:avatar a owl:DatatypeProperty ; + rdfs:label "avatar"@en ; + rdfs:comment "URL of the contact's photo."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:bio a owl:DatatypeProperty ; + rdfs:label "bio"@en ; + rdfs:comment "Biography of the contact."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:email a owl:DatatypeProperty ; + rdfs:label "email"@en ; + rdfs:comment "The email address of the contact. (Queryable)"@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:organization a owl:DatatypeProperty ; + rdfs:label "organization"@en ; + rdfs:comment "Current or most recent place of work."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:title a owl:DatatypeProperty ; + rdfs:label "title"@en ; + rdfs:comment "Current or most recent job title."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:phone a owl:DatatypeProperty ; + rdfs:label "phone"@en ; + rdfs:comment "Phone number of the contact. (Queryable)"@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:updated a owl:DatatypeProperty ; + rdfs:label "updated"@en ; + rdfs:comment "Date-time last updated."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:website a owl:DatatypeProperty ; + rdfs:label "website"@en ; + rdfs:comment "URL of the contact's website."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:PersonDetails a owl:Thing + rdfs:comment "PersonDetails"@en ; + rdfs:label "PersonDetails"@en . + +:details a owl:ObjectProperty ; + rdfs:label "details"@en ; + rdfs:comment "When included, additional details about the contact provided through Data Add-ons will be available here."@en ; + rdfs:domain :PersonSummary ; + rdfs:range :PersonDetails . + +:website a owl:DatatypeProperty ; + rdfs:label "website"@en ; + rdfs:comment "URL of the contact's website."@en ; + rdfs:domain :PersonSummary ; + rdfs:range xsd:string . + +:gender a owl:DatatypeProperty ; + rdfs:label "gender"@en ; + rdfs:comment "Gender of the contact."@en ; + rdfs:domain :PersonDetails ; + rdfs:range xsd:string . + diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala new file mode 100644 index 0000000..929ff09 --- /dev/null +++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/FullContactSocialGraph.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.fullcontact + +import java.io.InputStream +import java.io.OutputStream +import java.io.{BufferedInputStream, File, FileInputStream, FileOutputStream, PrintStream} +import java.util.Scanner +import java.util.concurrent.Callable + +import com.google.common.base.Preconditions +import com.typesafe.config.Config +import org.apache.juneau.json.JsonParser +import org.apache.streams.config.StreamsConfigurator +import org.apache.streams.fullcontact.FullContactSocialGraph.FullContactSocialGraphStats +import org.apache.streams.fullcontact.FullContactSocialGraph.typesafe +import org.apache.streams.fullcontact.pojo.PersonSummary +import org.apache.streams.fullcontact.util.FullContactUtils + +import scala.io.Source +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +/** + * Produce an activity streams 2.0 social graph from full contact response data file. + */ +object FullContactSocialGraph { + + lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.FullContactSocialGraph") + + /** + * To use from command line: + * + * <p/> + * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=application.conf org.apache.streams.fullcontact.SocialGraphCli + * + * <p/> + * Input stream should contain a series of json-serialized `PersonSummary` objects. + * + * <p/> + * Output stream will contain a TTL-serialized social graph. + * + * <p/> + * Input to the process is: + * A file if application.conf contains an 'input' key + * A file if -Dinput= is specified + * stdin otherwise + * + * Output from the process is: + * A file if application.conf contains an 'input' key + * A file if -Doutput= is specified + * stdout otherwise + * + * @link org.apache.streams.fullcontact.FullContactSocialGraph + * @throws Exception Exception + */ + @throws[Exception] + final def main(args: Array[String]): Unit = { + + val inputStream: InputStream = if (typesafe.hasPath("input")) { + new FileInputStream(new File(typesafe.getString("input"))) + } else System.in + + val outputStream: OutputStream = if (typesafe.hasPath("output")) { + new FileOutputStream(new File(typesafe.getString("output"))) + } else System.out + + val job = Try(new FullContactSocialGraph(inputStream, outputStream)) + + job match { + case Success(_) => return job.get + case Failure(t : Throwable) => throw new Exception(t) + } + } + + case class FullContactSocialGraphStats( + inputLines : Int, + personSummaries : Int, + allOrganizations : Int, + allInterestItems : Int, + uniqueInterests : Int, + topicHierarchy : Int, + allProfiles : Int, + allProfileRelationships : Int, + allEmploymentItems : Int, + uniqueEmployers : Int, + allUrlRelationships : Int, + allImageRelationships : Int + ) +} + +class FullContactSocialGraph(in: InputStream, out: OutputStream ) extends Callable[FullContactSocialGraphStats] { + + val inputStream: BufferedInputStream = new BufferedInputStream(in) + val outputStream: PrintStream = new PrintStream(out) + + override def call() : FullContactSocialGraphStats = { + + val input = Source.fromInputStream (inputStream) + val inputLines = input.getLines() + + // sequence of all PersonSummary + val personSummaries = inputLines.map (JsonParser.DEFAULT.parse (_, classOf[PersonSummary] ) ).toSeq + + // PersonSummary derived sequences + val allOrganizations = FullContactUtils.allOrganizationItems (personSummaries.toIterator).toSeq + val allInterestItems = FullContactUtils.allInterestItems (personSummaries.toIterator).toSeq + val uniqueInterests = FullContactUtils.uniqueInterests (allInterestItems.toIterator).toSeq + val topicHierarchy = FullContactUtils.topicHierarchy (allInterestItems.toIterator).toSeq + + val allProfiles = FullContactUtils.allProfiles (personSummaries.toIterator).seq + val allProfileRelationships = FullContactUtils.allProfileRelationships (personSummaries.toIterator).seq + + val allEmploymentItems = FullContactUtils.allEmploymentItems (personSummaries.toIterator).seq + val uniqueEmployers = FullContactUtils.uniqueEmployers (allEmploymentItems).seq + + val allUrlRelationships = FullContactUtils.allUrlRelationships (personSummaries.toIterator).seq + val allImageRelationships = FullContactUtils.allImageRelationships (personSummaries.toIterator).seq + + personSummaries.flatMap (FullContactUtils.safe_personSummaryAsTurtle).foreach (outputStream.println (_) ) + allOrganizations.flatMap (FullContactUtils.safe_organizationAsTurtle).foreach (outputStream.println (_) ) + + uniqueInterests.flatMap (FullContactUtils.safe_interestTopicAsTurtle).foreach (outputStream.println (_) ) + topicHierarchy.map (FullContactUtils.topicRelationshipAsTurtle).foreach (outputStream.println (_) ) + + allUrlRelationships.flatMap (FullContactUtils.safe_urlRelationshipAsTurtle).foreach (outputStream.println (_) ) + allImageRelationships.flatMap (FullContactUtils.safe_imageRelationshipAsTurtle).foreach (outputStream.println (_) ) + + allProfiles.flatMap (FullContactUtils.safe_profileAsTurtle).foreach (outputStream.println (_) ) + allProfileRelationships.flatMap (FullContactUtils.safe_personProfileRelationshipAsTurtle).foreach (outputStream.println (_) ) + + FullContactSocialGraphStats( + inputLines = inputLines.size, + personSummaries = personSummaries.size, + allOrganizations = allOrganizations.size, + allInterestItems = allInterestItems.size, + uniqueInterests = uniqueInterests.size, + topicHierarchy = topicHierarchy.size, + allProfiles = allProfiles.size, + allProfileRelationships = allProfileRelationships.size, + allEmploymentItems = allEmploymentItems.size, + uniqueEmployers = uniqueEmployers.size, + allUrlRelationships = allUrlRelationships.size, + allImageRelationships = allImageRelationships.size + ) + } +} diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala new file mode 100644 index 0000000..15d06c9 --- /dev/null +++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/PersonEnrichmentProcessor.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.fullcontact + +import java.io.BufferedInputStream +import java.io.BufferedOutputStream +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.io.PrintStream +import java.util.Scanner + +import com.typesafe.config.Config +import org.apache.streams.config.ComponentConfigurator +import org.apache.streams.config.StreamsConfigurator +import org.apache.streams.core.StreamsDatum +import org.apache.streams.core.StreamsProcessor +import org.apache.streams.fullcontact.api.EnrichPersonRequest +import org.apache.streams.fullcontact.config.FullContactConfiguration +import org.apache.streams.fullcontact.pojo.PersonSummary + +import scala.collection.JavaConversions._ +import scala.io.Source +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +/** + * Call enrich persons on a series of requests + */ +object PersonEnrichmentProcessor { + + lazy val typesafe: Config = StreamsConfigurator.getConfig.getConfig("org.apache.streams.fullcontact.PersonEnrichmentProcessor") + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * org.apache.streams.fullcontact.config.FullContactConfiguration.token = "" + * + * <p/> + * Launch syntax: + * + * <p/> + * java -cp streams-dist-jar-with-dependencies.jar -Dconfig.file=./application.conf org.apache.streams.fullcontact.provider.PersonEnrichmentProcessor + * + * <p/> + * Input to the process is: + * A file if application.conf contains an 'input' key + * A file if -Dinput= is specified + * stdin otherwise + * + * Output from the process is: + * A file if application.conf contains an 'input' key + * A file if -Doutput= is specified + * stdout otherwise + * + * @link org.apache.streams.fullcontact.api.EnrichPersonRequest + * @param args application.conf input.jsonl output.jsonl + * @throws Exception Exception + */ + @throws[Exception] + final def main(args: Array[String]): Unit = { + + val inputStream = if (typesafe.hasPath("input")) { + new BufferedInputStream(new FileInputStream(new File(typesafe.getString("input")))) + } else System.in + + val outputStream = if (typesafe.hasPath("output")) { + new PrintStream(new FileOutputStream(new File(typesafe.getString("output")))) + } else System.out + + val input = Source.fromInputStream(inputStream) + val inputLines = input.getLines() + val inputDatums = inputLines.map(entry => new StreamsDatum(entry)) + + val outStream = new PrintStream(new BufferedOutputStream(outputStream)) + + val outputDatums = streamDatums(inputDatums) + val outputLines = outputDatums.map(_.getDocument().asInstanceOf[String]) + + for( line <- outputLines ) { + outStream.println(line) + } + outStream.flush() + outStream.close() + } + + def stream( iter : Iterator[EnrichPersonRequest] ) + ( implicit processor : PersonEnrichment = FullContact.getInstance() ) : Iterator[PersonSummary] = { + iter.map( item => processor.enrichPerson(item) ) + } + + def streamDatums( iter : Iterator[StreamsDatum] ) + ( implicit processor : PersonEnrichmentProcessor = new PersonEnrichmentProcessor() ) : Iterator[StreamsDatum] = { + iter.flatMap( item => processor.process(item) ) + } + + def processor : Iterator[EnrichPersonRequest] => Iterator[PersonSummary] = { + stream(_) + } + +} + +class PersonEnrichmentProcessor(config : FullContactConfiguration = new ComponentConfigurator[FullContactConfiguration](classOf[FullContactConfiguration]).detectConfiguration()) + extends StreamsProcessor with Serializable { + + var personEnrichment = FullContact.getInstance(config) + + /** + * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will + * passed to every down stream operation that reads from this processor. + * + * @param entry StreamsDatum to be processed + * @return resulting StreamDatums from processing. Should never be null or contain null object. Empty list OK. + */ + override def process(entry: StreamsDatum): java.util.List[StreamsDatum] = { + val request : EnrichPersonRequest = { + entry.getDocument match { + case _ : EnrichPersonRequest => entry.getDocument.asInstanceOf[EnrichPersonRequest] + case _ : String => personEnrichment.parser.parse(entry.getDocument, classOf[EnrichPersonRequest]) + case _ => throw new Exception("invalid input type") + } + } + val attempt = Try(personEnrichment.enrichPerson(request)) + attempt match { + case Success(_ : PersonSummary) => List(new StreamsDatum(personEnrichment.serializer.serialize(attempt.get))) + case Failure(_) => List() + } + } + + /** + * Each operation must publish an identifier. + */ + override def getId: String = "PersonEnrichmentProcessor" + + /** + * This method will be called after initialization/serialization. Initialize any non-serializable objects here. + * + * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type + * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.) + */ + override def prepare(configurationObject: Any): Unit = { + personEnrichment = FullContact.getInstance(configurationObject.asInstanceOf[FullContactConfiguration]) + } + + /** + * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method + * will be made. + * Use this method to terminate connections, etc. + */ + override def cleanUp(): Unit = { + personEnrichment.restClient.close() + personEnrichment = null + } +} diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala new file mode 100644 index 0000000..dabadc1 --- /dev/null +++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/AffinityOrdering.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.fullcontact.util + +import org.apache.juneau.ObjectMap + +object AffinityOrdering extends Ordering[ObjectMap] { + def sortByAffinityString(s1: String, s2: String) : Int = { + if(s1 == null && s2 == null) return 0; + if(s1 == null ) return -1; + if(s2 == null ) return 1; + if(s1.isEmpty && s2.isEmpty) return 0; + if(s1 == s2) return 0; + if(s1.isEmpty ) return -1; + if(s2.isEmpty ) return 1; + return s1.takeRight(1).compareToIgnoreCase(s2.takeRight(1)) + } + def sortByAffinity(o1: ObjectMap, o2: ObjectMap) : Int = { + return sortByAffinityString(o1.getString("affinity", ""), o2.getString("affinity", "")) + } + def compare(a:ObjectMap, b:ObjectMap) = sortByAffinity(a,b) +} diff --git a/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala new file mode 100644 index 0000000..e6db251 --- /dev/null +++ b/streams-contrib/streams-processor-fullcontact/src/main/scala/org/apache/streams/fullcontact/util/FullContactUtils.scala @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.fullcontact.util + +import org.apache.juneau.ObjectList +import org.apache.juneau.ObjectMap +import org.apache.juneau.json.JsonParser +import org.apache.streams.config.ComponentConfigurator + +import scala.collection.mutable.ListBuffer + +case class Employer(name : String, domain : String) + +case class EmployerRelationship(personId : String, employerId : String) + +case class InterestTopic(id : String, name : String, category : String) + +case class TopicRelationship(child : String, parent : String) + +case class Organization(name : String, domain : String) + +case class OrganizationRelationship(personId : String, orgId : String) + +case class PersonProfileRelationship(personId : String, profileUri : String) + +case class ImageRelationship( entityUri : String, url : String, label : String) + +case class UrlRelationship( entityUri : String, url : String, label : String) + +object FullContactUtils { + + import org.apache.streams.fullcontact.api._ + import org.apache.streams.fullcontact.config._ + import org.apache.streams.fullcontact.pojo._ + + import scala.collection.JavaConversions._ + import scala.util.Try + + final implicit val fullContactConfiguration = new ComponentConfigurator(classOf[FullContactConfiguration]).detectConfiguration() + + val apst_angellist_ns = "http://streams.apache.org/streams-contrib/streams-provider-angellist#" + val apst_facebook_ns = "http://streams.apache.org/streams-contrib/streams-provider-facebook#" + val apst_foursquare_ns = "http://streams.apache.org/streams-contrib/streams-provider-foursquare#" + val apst_googleplus_ns = "http://streams.apache.org/streams-contrib/streams-provider-googleplus#" + val apst_instagram_ns = "http://streams.apache.org/streams-contrib/streams-provider-instagram#" + val apst_linkedin_ns = "http://streams.apache.org/streams-contrib/streams-provider-linkedin#" + val apst_twitter_ns = "http://streams.apache.org/streams-contrib/streams-provider-twitter#" + + val fc_ns = "http://api.fullcontact.com/" + val fc_image_ns = "http://api.fullcontact.com/image/" + val fc_org_ns = "http://api.fullcontact.com/organization/" + val fc_person_ns = "http://api.fullcontact.com/person/" + val fc_profile_ns = "http://api.fullcontact.com/profile/" + val fc_topic_ns = "http://api.fullcontact.com/topic/" + val fc_url_ns = "http://api.fullcontact.com/url/" + + val fc_prefix = "fc" + val fc_image_prefix = "fc_img" + val fc_org_prefix = "fc_org" + val fc_person_prefix = "fc_person" + val fc_profile_prefix = "fc_profile" + val fc_topic_prefix = "fc_topic" + val fc_url_prefix = "fc_url" + + def imgId( url: String ) : String = url.hashCode.toString + + def urlId( url: String ) : String = url.hashCode.toString + + def companyId( company: CompanySummary ) : String = { + company.getName.replaceAll("\\p{Punct}","-") + } + + def orgId( organization: Organization ) : String = { + organization.domain.replaceAll("\\p{Punct}", "-") + } + + def orgLabel( input: String ) : String = { + input.replaceAll("\\p{Punct}", " ") + } + + def orgLabel( organization: Organization ) : String = { + orgLabel(organization.name) + } + + def personId( input : PersonSummary ) : String = { + input.getFullName.replaceAll("\\W","") + } + + def personSafeName( input : PersonSummary ) : String = { + input.getFullName.replaceAll("\\W","") + } + + def profileUsername( input : PersonProfile ) : String = { + input.getUsername.replaceAll("\\W","") + } + + def profileNamespaceAndId( profile : PersonProfile ) : (String,String) = { + profile.getService match { + case "angellist" => (apst_angellist_ns,profileId(profile)) + case "twitter" => (apst_twitter_ns,profileId(profile)) + case "facebook" | "facebookpage" => (apst_facebook_ns,profileId(profile)) + case "foursquare" => (apst_foursquare_ns,profileId(profile)) + case "linkedin" => (apst_linkedin_ns,profileId(profile)) + case "instagram" => (apst_instagram_ns,profileUsername(profile)) + case "googleplus" => (apst_googleplus_ns,profileId(profile)) + } + } + + def uriFromNamespaceAndId( ns_id : (String, String) ) = s"${ns_id._1}${ns_id._2}" + + def profilePrefixAndType( profile : PersonProfile ) : (String,String) = { + profile.getService match { + case "angellist" => ("fc","Profile") + case "twitter" => ("apst","TwitterProfile") + case "facebook" | "facebookpage" => ("apst","FacebookProfile") + case "foursquare" => ("apst","FoursquareProfile") + case "linkedin" => ("apst","LinkedinProfile") + case "instagram" => ("apst","InstagramProfile") + case "googleplus" => ("apst","GooglePlusProfile") + } + } + + def profileId( profile : PersonProfile ) : String = { + val idOrUsername = { + if( profile.getUserid.nonEmpty ) profile.getUserid + else if ( profile.getUsername.nonEmpty ) profile.getUsername + else profile.getUrl.split("/").last + } + s"${profile.getService}/${idOrUsername}" + } + + def topicId( topic : InterestTopic ) : String = { + topic.id + } + + def selectProfiles( profiles : PersonProfiles) : Seq[PersonProfile] = { + val list = new ListBuffer[PersonProfile] + list += profiles.getAngellist + list += profiles.getFacebook + list += profiles.getFacebookpage + list += profiles.getFoursquare + list += profiles.getGoogleplus + list += profiles.getInstagram + list += profiles.getLinkedin + list += profiles.getTwitter + list + } + + def personProfileRelationships( summary : PersonSummary ) : Seq[PersonProfileRelationship] = { + val list = new ListBuffer[PersonProfileRelationship] + for( profile <- selectProfiles(summary.getDetails.getProfiles())) { + val person_id = Try(personId(summary)) + val profile_uri = Try(uriFromNamespaceAndId(profileNamespaceAndId(profile))) + if( person_id.isSuccess && + person_id.get != null && + !person_id.get.isEmpty && + profile_uri.isSuccess && + profile_uri.get != null && + !profile_uri.get.isEmpty ) { + val rel = PersonProfileRelationship(person_id.get, profile_uri.get) + list += rel + } + } + list + } + + def allInterestItems( input : Iterator[PersonSummary] ) : Iterator[PersonInterestItem] = { + input.flatMap(person => person.getDetails.getInterests.toSeq) + } + + def uniqueInterests( input : Iterator[PersonInterestItem] ) : Iterator[InterestTopic] = { + input.map(interestItem => InterestTopic(interestItem.getId, interestItem.getName, interestItem.getCategory)).toSet.toIterator + } + + def topicHierarchy( input : Iterator[PersonInterestItem] ) : Iterator[TopicRelationship] = { + input.flatMap(item => item.getParentIds.map(parentId => TopicRelationship(item.getId, parentId))).toSet.toIterator + } + + def allImageRelationships( input : Iterator[PersonSummary] ) : Iterator[ImageRelationship] = { + input.flatMap(item => item.getDetails.getPhotos.flatMap( + photo => Try(ImageRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), photo.getValue, photo.getLabel)).toOption + )) + } + + def allUrlRelationships( input : Iterator[PersonSummary] ) : Iterator[UrlRelationship] = { + input.flatMap(item => item.getDetails.getUrls.flatMap( + url => Try(UrlRelationship(uriFromNamespaceAndId(fc_person_ns,personId(item)), url.getValue, url.getLabel)).toOption + )) + } + + def allOrganizationItems( input : Iterator[PersonSummary] ) : Iterator[Organization] = { + input.flatMap(person => person.getDetails.getEmployment.map(employment => Organization(employment.getName, employment.getDomain))).toSet.toIterator + } + + def allProfiles( input : Iterator[PersonSummary] ) : Iterator[PersonProfile] = { + input.flatMap(summary => selectProfiles(summary.getDetails.getProfiles)) + } + + def allProfileRelationships( input : Iterator[PersonSummary] ) : Iterator[PersonProfileRelationship] = { + input.flatMap(personProfileRelationships) + } + + def allEmploymentItems( input : Iterator[PersonSummary] ) : Iterator[PersonEmploymentItem] = { + input.flatMap(person => person.getDetails.getEmployment.toSeq) + } + + def uniqueEmployers( input : Iterator[PersonEmploymentItem] ) : Iterator[Employer] = { + input.map(employmentItem => Employer(employmentItem.getName, employmentItem.getDomain)).toSet.toIterator + } + + def personSummaryAsTurtle(root: PersonSummary): String = { + val id = personId(root) + val sb = new StringBuilder() + sb.append(s"""|${fc_person_prefix}:${id} + | a ${fc_prefix}:Person ; + | as:displayName "${root.getFullName}" ; + | as:url "${root.getWebsite}" ; + | dct:modified "${root.getUpdated}" ; + | vcard:fn "${root.getFullName}" ; + | vcard:org "${orgLabel(root.getOrganization)}" ; + | vcard:title "${root.getTitle}" ; + | . + | + |""".stripMargin) + if( !root.getGender.isEmpty) + sb.append(s"""${fc_person_prefix}:${id} vcard:gender "${root.getGender}" . """).append("\n") + if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty) + root.getDetails.getEmails.foreach ( + email => { + sb.append(s"""${fc_person_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n") + } + ) + if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty) + root.getDetails.getPhones.foreach ( + tel => { + sb.append(s"""${fc_person_prefix}:${id} vcard:tel "tel:${tel.getValue}" . """).append("\n") + } + ) + if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty) + root.getDetails.getUrls.foreach ( + url => { + sb.append(s"""${fc_person_prefix}:${id} vcard:url "${url.getValue}" . """).append("\n") + } + ) + sb.toString + } + + def safe_personSummaryAsTurtle(root: PersonSummary): Option[String] = { + Try(personSummaryAsTurtle(root)).toOption + } + + def urlRelationshipAsTurtle(item: UrlRelationship) : String = { + val uri = fc_url_ns + urlId(item.url) + val sb = new StringBuilder() + sb.append(s"""|<${uri}> + | a as:Link ; + | as:href "${item.url}" ; + | as:rel "${item.label}" . + | + |<${item.entityUri}> as:link <${uri}> . + | + |""".stripMargin) + sb.toString + } + + def safe_urlRelationshipAsTurtle(item: UrlRelationship) : Option[String] = { + Try(urlRelationshipAsTurtle(item)).toOption + } + + def imageRelationshipAsTurtle(item: ImageRelationship) : String = { + val uri = fc_url_ns + urlId(item.url) + val sb = new StringBuilder() + sb.append(s"""|<${uri}> + | a as:Image ; + | as:href "${item.url}" ; + | as:rel "${item.label}" . + | + |<${item.entityUri}> as:image <${uri}> . + | + |""".stripMargin) + sb.toString + } + + def safe_imageRelationshipAsTurtle(item: ImageRelationship) : Option[String] = { + Try(imageRelationshipAsTurtle(item)).toOption + } + + def interestTopicAsTurtle(topic: InterestTopic): String = { + val id = topic.id + val name = topic.name.replaceAll("\\p{Punct}"," "); + val sb = new StringBuilder() + sb.append(s"""|${fc_topic_prefix}:${id} a skos:Concept . + |${fc_topic_prefix}:${id} skos:prefLabel "${name}" . + | + |""".stripMargin) + sb.toString + } + + def safe_interestTopicAsTurtle(topic: InterestTopic): Option[String] = { + Try(interestTopicAsTurtle(topic)).toOption + } + + def topicRelationshipAsTurtle(relationship: TopicRelationship): String = { + s"${fc_topic_prefix}:${relationship.child} skos:broader ${fc_topic_prefix}:${relationship.parent} ." + } + + def personInterestsAsTurtle(root: PersonSummary): String = { + val id = personId(root) + val sb = new StringBuilder() + if( root.getDetails.getInterests != null && !root.getDetails.getInterests.isEmpty) + root.getDetails.getInterests.foreach ( + interest => { + sb.append(s"""${fc_person_prefix}:${id} foaf:interest ${fc_topic_prefix}:${interest.getId} . """).append("\n") + } + ) + sb.toString + } + + def safe_personInterestsAsTurtle(root: PersonSummary): Option[String] = { + Try(personInterestsAsTurtle(root)).toOption + } + + def organizationAsTurtle(organization: Organization): String = { + val ns = fc_org_ns + val id = orgId(organization) + val sb = new StringBuilder() + sb.append(s"""|${fc_org_prefix}:${id} + | a ${fc_prefix}:Organization ; + | as:displayName "${orgLabel(organization)}" ; + | as:url "${organization.domain}" ; + | . + | + |""".stripMargin) + sb.toString + } + + def safe_organizationAsTurtle(organization: Organization): Option[String] = { + Try(organizationAsTurtle(organization)).toOption + } + + def personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): String = { + s"<${relationship.profileUri}> as:describes ${fc_person_prefix}:${relationship.personId} ." + } + + def safe_personProfileRelationshipAsTurtle(relationship: PersonProfileRelationship): Option[String] = { + import scala.util.Try + Try(personProfileRelationshipAsTurtle(relationship)).toOption + } + + def profileAsTurtle(profile: PersonProfile): String = { + val id = profileId(profile) + val uri = uriFromNamespaceAndId(profileNamespaceAndId(profile)) + val prefix_type = profilePrefixAndType(profile) + val bio = profile.getBio.replaceAll("\\p{Punct}"," "); + val sb = new StringBuilder() + sb.append(s"""|<$uri> + | a ${prefix_type._1}:${prefix_type._2} ; + | as:id "${id}" ; + | as:name "${profile.getUsername}" ; + | as:displayName "${profile.getUsername}" ; + | as:summary "${bio}" ; + | as:url "${profile.getUrl}" ; + | as:provider "${profile.getService}" ; + |""".stripMargin) + if( profile.getFollowers != null) + sb.append(s""" apst:followers "${profile.getFollowers}"^^xs:integer ;""").append("\n") + if( profile.getFollowing != null) + sb.append(s""" apst:following "${profile.getFollowing}"^^xs:integer ;""").append("\n") + sb.append("").append("\n") + sb.toString + } + + def safe_profileAsTurtle(profile: PersonProfile): Option[String] = { + Try(profileAsTurtle(profile)).toOption + } + + + def companySummaryAsTurtle(root: CompanySummary): String = { + import scala.collection.JavaConversions._ + val ns = fc_org_ns + val id = companyId(root) + val bio = root.getBio.replaceAll("\\p{Punct}"," "); + val sb = new StringBuilder() + sb.append(s"""|${fc_org_prefix}:${id} + | a ${fc_prefix}:Organization ; + | as:displayName "${root.getName}" ; + | as:url "${root.getWebsite}" ; + | as:summary "${bio}" ; + | ${fc_prefix}:employees "${root.getEmployees}"^^xs:integer ; + | ${fc_prefix}:founded "${root.getFounded}"^^xs:integer ; + | vcard:category "${root.getCategory}" ; + | . + |""".stripMargin) + if( root.getDetails.getEmails != null && !root.getDetails.getEmails.isEmpty) + root.getDetails.getEmails.foreach ( + email => { + sb.append(s"""${fc_org_prefix}:${id} vcard:email "mailto:${email.getValue}" . """).append("\n") + } + ) + if( root.getDetails.getPhones != null && !root.getDetails.getPhones.isEmpty) + root.getDetails.getPhones.foreach ( + phone => { + sb.append(s"""${fc_org_prefix}:${id} vcard:tel "tel:${phone.getValue}" . """).append("\n") + } + ) + if( root.getDetails.getUrls != null && !root.getDetails.getUrls.isEmpty) + root.getDetails.getUrls.foreach ( + url => { + sb.append(s"""${fc_org_prefix}:${id} vcard:url "${url.getUrl}" . """).append("\n") + } + ) + sb.toString + } + + def safe_companySummaryAsTurtle(profile: CompanySummary): Option[String] = { + Try(companySummaryAsTurtle(profile)).toOption + } + + def normalize_age_range(ageRange: String) : Option[Int] = { + val avg = Try { + val seq = ageRange.split("-").map(_.toLong).toSeq + (seq.sum / seq.length).toInt + }.toOption + avg + } + + def callEnrichPerson(request : EnrichPersonRequest)(implicit config : FullContactConfiguration) : String = { + import java.net.URI + + import org.apache.http.client.utils.URIBuilder + import org.apache.juneau.json.JsonParser + import org.apache.juneau.json.JsonSerializer + import org.apache.juneau.rest.client.RestCall + import org.apache.juneau.rest.client.RestClient + val auth_header = s"Bearer ${config.getToken()}" + val url = "https://api.fullcontact.com/v3/person.enrich" + val uri : URI = new URIBuilder(url).build() + lazy val parser = JsonParser.create(). + debug(). + ignoreUnknownBeanProperties(true). + ignorePropertiesWithoutSetters(true). + build() + lazy val serializer = JsonSerializer.create(). + debug(). + trimEmptyCollections(true). + trimNullProperties(true). + trimEmptyMaps(true). + build() + val restClientBuilder = RestClient. + create(). + authorization(auth_header). + beansRequireSerializable(true). + debug(). + disableAutomaticRetries(). + disableCookieManagement(). + disableRedirectHandling(). + json(). + parser(parser). + rootUrl(uri). + serializer(serializer) + + val post : RestCall = restClientBuilder. + build(). + doPost(uri). + body(request) + + Thread.sleep(1000) + + post.getResponseAsString + } + + def safeCallEnrichPerson(request : EnrichPersonRequest) : Option[String] = { + Try(callEnrichPerson(request)).toOption + } + + def parseEnrichPersonResponse(response : String): PersonSummary = { + import org.apache.juneau.json.JsonParser + val result = JsonParser.DEFAULT.parse(response, classOf[PersonSummary]) + result + } + + def safeParseEnrichPersonResponse(response : String): Option[PersonSummary] = { + Try(parseEnrichPersonResponse(response)).toOption + } + + def educationItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("degree","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])}%s)""").toOption + def educationSummary(json : String) : List[String] = { + val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList]) + val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => educationItem(row) )) + if( summaries.isFailure || summaries.get.size == 0 ) return List() else return summaries.get.toList + } + + def employmentItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s ${row.getString("title","")}%s (${row.getAt("start/year",classOf[String])}%s - ${row.getAt("end/year",classOf[String])})""").toOption + def employmentSummary(json : String) : List[String] = { + val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList]) + val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => employmentItem(row))) + if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList + } + + def interestItem(row : ObjectMap) : Option[String] = Try(f"""${row.getString("name","")}%s (${row.getString("affinity","")}%s)""").toOption + def interestSummary(json : String) : List[String] = { + val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList]) + val summaries = Try(list.elements(classOf[ObjectMap]).toSeq.sorted(AffinityOrdering).flatMap( row => interestItem(row))) + if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList + } + + def profileItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("url")}%s""").toOption + def profileSummary(json : String) : List[String] = { + val map = JsonParser.DEFAULT.parse(json, classOf[ObjectMap]) + val summaries = Try(map.entrySet().flatMap( row => profileItem(row.getValue().asInstanceOf[ObjectMap])).toSeq.sorted) + if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList + } + + def urlItem(row : ObjectMap) : Option[String] = Try(f"""${row.get("value")}%s""").toOption + def urlSummary(json : String) : List[String] = { + val list = JsonParser.DEFAULT.parse(json, classOf[ObjectList]) + val summaries = Try(list.elements(classOf[ObjectMap]).flatMap( row => urlItem(row)).toSeq.sorted) + if( summaries.isFailure || summaries.get.size == 0 ) return List() else summaries.get.toList + } + +}