http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java new file mode 100644 index 0000000..a53eaa5 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.google.gmail.provider; + +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonManagedReference; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.AnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.googlecode.gmail4j.GmailException; +import com.googlecode.gmail4j.GmailMessage; +import com.googlecode.gmail4j.javamail.JavaMailGmailMessage; +import com.sun.mail.imap.IMAPFolder; +import com.sun.mail.imap.IMAPMessage; +import com.sun.mail.imap.IMAPSSLStore; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.*; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.internet.MimeMultipart; +import java.io.IOException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; + +/** +* Created with IntelliJ IDEA. +* User: mdelaet +* Date: 9/30/13 +* Time: 9:24 AM +* To change this template use File | Settings | File Templates. +*/ +public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class); + + GMailProvider provider; + + ObjectMapper mapper = new ObjectMapper(); + + public GMailMessageActivitySerializer(GMailProvider provider) { + + this.provider = provider; + + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE); + + mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class); + mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class); + mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class); + + } + + public GMailMessageActivitySerializer() { + } + + @Override + public String serializationFormat() { + return "gmail.v1"; + } + + @Override + public GmailMessage serialize(Activity activity) { + return null; + } + + @Override + public Activity deserialize(GmailMessage gmailMessage) { + + Activity activity = new Activity(); + activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber()))); + activity.setPublished(new DateTime(gmailMessage.getSendDate())); + Provider provider = new Provider(); + provider.setId("http://gmail.com"); + provider.setDisplayName("GMail"); + activity.setProvider(provider); + Actor actor = new Actor(); + actor.setId(gmailMessage.getFrom().getEmail()); + actor.setDisplayName(gmailMessage.getFrom().getName()); + activity.setActor(actor); + activity.setVerb("email"); + ActivityObject object = new ActivityObject(); + try { + object.setId(gmailMessage.getTo().get(0).getEmail()); + object.setDisplayName(gmailMessage.getTo().get(0).getName()); + } catch( GmailException e ) { + LOGGER.warn(e.getMessage()); + } + activity.setTitle(gmailMessage.getSubject()); + try { + activity.setContent(gmailMessage.getContentText()); + } catch( GmailException e ) { + LOGGER.warn(e.getMessage()); + } + activity.setObject(object); + +// try { +// // if jackson can't serialize the object, find out now +// String jsonString = mapper.writeValueAsString(gmailMessage); +// ObjectNode jsonObject = mapper.valueToTree(gmailMessage); +// // since it can, write the entire source object to extensions.gmail +// Map<String, Object> extensions = Maps.newHashMap(); +// extensions.put("gmail", gmailMessage); +// activity.setAdditionalProperty("extensions", extensions); +// } catch (JsonProcessingException e) { +// LOGGER.debug("Failed Json Deserialization"); +// e.printStackTrace(); +// } + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<GmailMessage> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } + + public Activity convert(ObjectNode event) { + return null; + } + + public static Generator buildGenerator(ObjectNode event) { + return null; + } + + public static Icon getIcon(ObjectNode event) { + return null; + } + + public static Provider buildProvider(ObjectNode event) { + Provider provider = new Provider(); + provider.setId("id:providers:gmail"); + return provider; + } + + public static List<Object> getLinks(ObjectNode event) { + return null; + } + + public static String getUrls(ObjectNode event) { + return null; + } + + public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) { + Map<String, Object> extensions = ensureExtensions(activity); + extensions.put("gmail", gmailMessage); + } + + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:gmail", idparts)); + } + + interface MessageMixIn { + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getDefaultFolder(); // we don't need it! + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getPersonalNamespaces(); // we don't need it! + @JsonManagedReference + @JsonIgnore + IMAPFolder getStore(); // we don't need it! + // @JsonManagedReference +// @JsonIgnore +// @JsonBackReference + //IMAPFolder getParent(); // we don't need it! + @JsonManagedReference + @JsonIgnore + @JsonBackReference + IMAPMessage getFolder(); // we don't need it! + @JsonManagedReference + @JsonIgnore + @JsonProperty("parent") + @JsonBackReference + MimeMultipart getParent(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java index 8f0f491..583c741 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java @@ -32,9 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -/** - * Consider replacing this with a simpler Processer which extends SimpleHTTPGetProcessor - */ public class GooglePlusCommentProcessor implements StreamsProcessor { private final static String STREAMS_ID = "GooglePlusCommentProcessor"; private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java index 52b29dd..73e261f 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java @@ -38,10 +38,6 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Queue; -@Deprecated -/* - * Modules and streams should adopt TypeConverterProcessor and ActivityConverterProcessor - */ public class GooglePlusTypeConverter implements StreamsProcessor { public final static String STREAMS_ID = "GooglePlusTypeConverter"; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java deleted file mode 100644 index e4a1f5d..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - - -public class GPlusActivityConverter implements ActivityConverter<com.google.api.services.plus.model.Activity> { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityConverter.class); - - AbstractGPlusProvider provider; - - public GPlusActivityConverter(AbstractGPlusProvider provider) { - - this.provider = provider; - } - - public GPlusActivityConverter() { - } - - @Override - public String serializationFormat() { - return "gplus.v1"; - } - - @Override - public com.google.api.services.plus.model.Activity serialize(Activity deserialized) { - throw new NotImplementedException("Not currently implemented"); - } - - @Override - public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) { - Activity activity = new Activity(); - - GooglePlusActivityUtil.updateActivity(gplusActivity, activity); - return activity; - } - - @Override - public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) { - throw new NotImplementedException("Not currently implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java new file mode 100644 index 0000000..4991e94 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.google.gplus.provider; + +import com.google.gplus.serializer.util.GooglePlusActivityUtil; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + + +public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> { + + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class); + + AbstractGPlusProvider provider; + + public GPlusActivitySerializer(AbstractGPlusProvider provider) { + + this.provider = provider; + } + + public GPlusActivitySerializer() { + } + + @Override + public String serializationFormat() { + return "gplus.v1"; + } + + @Override + public com.google.api.services.plus.model.Activity serialize(Activity deserialized) { + throw new NotImplementedException("Not currently implemented"); + } + + @Override + public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) { + Activity activity = new Activity(); + + GooglePlusActivityUtil.updateActivity(gplusActivity, activity); + return activity; + } + + @Override + public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java new file mode 100644 index 0000000..6ed2ae1 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.google.gplus.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.BlockingQueue; + +public class GPlusEventProcessor implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(GPlusEventProcessor.class); + + private ObjectMapper mapper = new ObjectMapper(); + + private BlockingQueue<String> inQueue; + private Queue<StreamsDatum> outQueue; + + private Class inClass; + private Class outClass; + + private GPlusActivitySerializer gPlusActivitySerializer = new GPlusActivitySerializer(); + + public final static String TERMINATE = new String("TERMINATE"); + + public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.inClass = inClass; + this.outClass = outClass; + } + + public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.outClass = outClass; + } + + @Override + public void run() { + + while(true) { + try { + String item = inQueue.take(); + Thread.sleep(new Random().nextInt(100)); + if(item==TERMINATE) { + LOGGER.info("Terminating!"); + break; + } + + // first check for valid json + ObjectNode node = (ObjectNode)mapper.readTree(item); + + // if the target is string, just pass-through + if( String.class.equals(outClass)) + outQueue.offer(new StreamsDatum(item)); + else { + // convert to desired format + com.google.api.services.plus.model.Activity gplusActivity = (com.google.api.services.plus.model.Activity)mapper.readValue(item, com.google.api.services.plus.model.Activity.class); + + Activity streamsActivity = gPlusActivitySerializer.deserialize(gplusActivity); + + outQueue.offer(new StreamsDatum(streamsActivity)); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java index 6acbbdb..f0101fd 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java @@ -21,7 +21,7 @@ package org.apache.streams.instagram.processor; import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.instagram.serializer.InstagramUserInfoConverter; +import org.apache.streams.instagram.serializer.InstagramUserInfoSerializer; import org.apache.streams.instagram.serializer.util.InstagramActivityUtil; import org.apache.streams.pojo.json.Activity; import org.jinstagram.entity.users.basicinfo.UserInfoData; @@ -32,10 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Queue; -@Deprecated -/* - * Modules and streams should adopt TypeConverterProcessor and ActivityConverterProcessor - */ public class InstagramTypeConverter implements StreamsProcessor { public final static String STREAMS_ID = "InstagramTypeConverter"; @@ -46,7 +42,7 @@ public class InstagramTypeConverter implements StreamsProcessor { private Queue<StreamsDatum> outQueue; private InstagramActivityUtil instagramActivityUtil; - private InstagramUserInfoConverter userInfoSerializer; + private InstagramUserInfoSerializer userInfoSerializer; private int count = 0; @@ -103,7 +99,7 @@ public class InstagramTypeConverter implements StreamsProcessor { @Override public void prepare(Object o) { instagramActivityUtil = new InstagramActivityUtil(); - this.userInfoSerializer = new InstagramUserInfoConverter(); + this.userInfoSerializer = new InstagramUserInfoSerializer(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java deleted file mode 100644 index 7b9dd09..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.instagram.serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.jinstagram.entity.users.feed.MediaFeedData; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity; - -public class InstagramJsonActivityConverter implements ActivityConverter<String>, Serializable -{ - - public InstagramJsonActivityConverter() { - - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public String serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(String serialized) throws ActivitySerializerException { - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - MediaFeedData mediaFeedData = null; - - try { - mediaFeedData = mapper.readValue(serialized, MediaFeedData.class); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - - Activity activity = new Activity(); - - updateActivity(mediaFeedData, activity); - - return activity; - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - throw new NotImplementedException(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java new file mode 100644 index 0000000..c5bbdf1 --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.instagram.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.jinstagram.entity.users.feed.MediaFeedData; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity; + +public class InstagramJsonActivitySerializer implements ActivitySerializer<String>, Serializable +{ + + public InstagramJsonActivitySerializer() { + + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + MediaFeedData mediaFeedData = null; + + try { + mediaFeedData = mapper.readValue(serialized, MediaFeedData.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + + updateActivity(mediaFeedData, activity); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java deleted file mode 100644 index 95456c1..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.apache.streams.instagram.serializer; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.Actor; -import org.apache.streams.pojo.json.Image; -import org.apache.streams.pojo.json.Provider; -import org.jinstagram.entity.users.basicinfo.UserInfoData; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.util.List; -import java.util.Map; - -/** - * - */ -public class InstagramUserInfoConverter implements ActivityConverter<UserInfoData> { - - private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoConverter.class); - - private static final String STREAMS_ID_PREFIX = "id:instagram:"; - private static final String PROVIDER_ID = "id:provider:instagram"; - private static final String DISPLAY_NAME = "Instagram"; - - @Override - public String serializationFormat() { - return null; - } - - @Override - public UserInfoData serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException(); - } - - @Override - public Activity deserialize(UserInfoData serialized) throws ActivitySerializerException { - Activity activity = new Activity(); - Provider provider = new Provider(); - provider.setId(PROVIDER_ID); - provider.setDisplayName(DISPLAY_NAME); - activity.setProvider(provider); - activity.setPublished(DateTime.now().withZone(DateTimeZone.UTC)); - Actor actor = new Actor(); - Image image = new Image(); - image.setUrl(serialized.getProfile_picture()); - actor.setImage(image); - actor.setId(STREAMS_ID_PREFIX+serialized.getId()); - actor.setSummary(serialized.getBio()); - actor.setAdditionalProperty("handle", serialized.getUsername()); - actor.setDisplayName(serialized.getFullName()); - Map<String, Object> extensions = Maps.newHashMap(); - actor.setAdditionalProperty("extensions", extensions); - extensions.put("screenName", serialized.getUsername()); - extensions.put("posts", serialized.getCounts().getMedia()); - extensions.put("followers", serialized.getCounts().getFollwed_by()); - extensions.put("website", serialized.getWebsite()); - extensions.put("following", serialized.getCounts().getFollows()); - return activity; - } - - @Override - public List<Activity> deserializeAll(List<UserInfoData> serializedList) { - List<Activity> result = Lists.newLinkedList(); - for(UserInfoData data : serializedList) { - try { - result.add(deserialize(data)); - } catch (ActivitySerializerException ase) { - LOGGER.error("Caught ActivitySerializerException, dropping user info data : {}", data.getId()); - LOGGER.error("Exception : {}", ase); - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java new file mode 100644 index 0000000..055169b --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java @@ -0,0 +1,83 @@ +package org.apache.streams.instagram.serializer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.instagram.UsersInfo; +import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Actor; +import org.apache.streams.pojo.json.Image; +import org.apache.streams.pojo.json.Provider; +import org.jinstagram.entity.users.basicinfo.UserInfoData; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.util.List; +import java.util.Map; + +/** + * + */ +public class InstagramUserInfoSerializer implements ActivitySerializer<UserInfoData> { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoSerializer.class); + + private static final String STREAMS_ID_PREFIX = "id:instagram:"; + private static final String PROVIDER_ID = "id:provider:instagram"; + private static final String DISPLAY_NAME = "Instagram"; + + @Override + public String serializationFormat() { + return null; + } + + @Override + public UserInfoData serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(UserInfoData serialized) throws ActivitySerializerException { + Activity activity = new Activity(); + Provider provider = new Provider(); + provider.setId(PROVIDER_ID); + provider.setDisplayName(DISPLAY_NAME); + activity.setProvider(provider); + activity.setPublished(DateTime.now().withZone(DateTimeZone.UTC)); + Actor actor = new Actor(); + Image image = new Image(); + image.setUrl(serialized.getProfile_picture()); + actor.setImage(image); + actor.setId(STREAMS_ID_PREFIX+serialized.getId()); + actor.setSummary(serialized.getBio()); + actor.setAdditionalProperty("handle", serialized.getUsername()); + actor.setDisplayName(serialized.getFullName()); + Map<String, Object> extensions = Maps.newHashMap(); + actor.setAdditionalProperty("extensions", extensions); + extensions.put("screenName", serialized.getUsername()); + extensions.put("posts", serialized.getCounts().getMedia()); + extensions.put("followers", serialized.getCounts().getFollwed_by()); + extensions.put("website", serialized.getWebsite()); + extensions.put("following", serialized.getCounts().getFollows()); + return activity; + } + + @Override + public List<Activity> deserializeAll(List<UserInfoData> serializedList) { + List<Activity> result = Lists.newLinkedList(); + for(UserInfoData data : serializedList) { + try { + result.add(deserialize(data)); + } catch (ActivitySerializerException ase) { + LOGGER.error("Caught ActivitySerializerException, dropping user info data : {}", data.getId()); + LOGGER.error("Exception : {}", ase); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java new file mode 100644 index 0000000..2f2d677 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.data; + +import com.fasterxml.jackson.databind.AnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.moreover.api.Article; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.util.MoreoverUtils; +import org.apache.streams.pojo.json.Activity; + +import java.io.IOException; +import java.util.List; + +/** + * Deserializes Moreover JSON format into Activities + */ +public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> { + + public MoreoverJsonActivitySerializer() { + } + + @Override + public String serializationFormat() { + return "application/json+vnd.moreover.com.v1"; + } + + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); + } + + @Override + public Activity deserialize(String serialized) { + serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); + + System.out.println(serialized); + + ObjectMapper mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); + mapper.setAnnotationIntrospector(introspector); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); + + Article article; + try { + ObjectNode node = (ObjectNode)mapper.readTree(serialized); + node.remove("tags"); + node.remove("locations"); + node.remove("companies"); + node.remove("topics"); + node.remove("media"); + node.remove("outboundUrls"); + ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); + jsonNodes.remove("editorialTopics"); + jsonNodes.remove("tags"); + jsonNodes.remove("autoTopics"); + article = mapper.convertValue(node, Article.class); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to deserialize", e); + } + return MoreoverUtils.convert(article); + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java new file mode 100644 index 0000000..d60bcb8 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.data; + +import com.moreover.api.Article; +import com.moreover.api.ArticlesResponse; +import com.moreover.api.ObjectFactory; +import org.apache.commons.lang.SerializationException; +import org.apache.streams.data.util.MoreoverUtils; +import org.apache.streams.pojo.json.Activity; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import java.io.StringReader; +import java.util.LinkedList; +import java.util.List; + +/** + * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity} + */ +public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> { + + //JAXBContext is threadsafe (supposedly) + private final JAXBContext articleContext; + private final JAXBContext articlesContext; + + public MoreoverXmlActivitySerializer() { + articleContext = createContext(Article.class); + articlesContext = createContext(ArticlesResponse.class); + } + + @Override + public String serializationFormat() { + return "application/xml+vnd.moreover.com.v1"; + } + + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); + } + + @Override + public Activity deserialize(String serialized) { + Article article = deserializeMoreover(serialized); + return MoreoverUtils.convert(article); + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + List<Activity> activities = new LinkedList<Activity>(); + for(String item : serializedList) { + ArticlesResponse response = deserializeMoreoverResponse(item); + for(Article article : response.getArticles().getArticle()) { + activities.add(MoreoverUtils.convert(article)); + } + } + return activities; + } + + private Article deserializeMoreover(String serialized){ + try { + Unmarshaller unmarshaller = articleContext.createUnmarshaller(); + return (Article) unmarshaller.unmarshal(new StringReader(serialized)); + } catch (JAXBException e) { + throw new SerializationException("Unable to deserialize Moreover data", e); + } + } + + private ArticlesResponse deserializeMoreoverResponse(String serialized){ + try { + Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); + return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); + } catch (JAXBException e) { + throw new SerializationException("Unable to deserialize Moreover data", e); + } + } + + private JAXBContext createContext(Class articleClass) { + JAXBContext context; + try { + context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); + } catch (JAXBException e) { + throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e); + } + return context; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java deleted file mode 100644 index 5c038ac..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.streams.data.moreover.conversion; - -import com.moreover.Moreover; -import com.moreover.api.Article; -import org.apache.streams.data.ActivityConverterResolver; -import org.apache.streams.exceptions.ActivitySerializerException; - -/** - * Ensures moreover documents can be converted to Activity - */ -public class MoreoverConverterResolver implements ActivityConverterResolver { - @Override - public Class bestSerializer(Class documentClass) throws ActivitySerializerException { - if( documentClass == Moreover.class ) - return MoreoverJsonActivityConverter.class; - else if( documentClass == Article.class ) - return MoreoverJsonActivityConverter.class; - else return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java deleted file mode 100644 index 88ca5db..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.streams.data.moreover.conversion; - -import com.google.common.base.Preconditions; -import com.moreover.Moreover; -import com.moreover.api.Article; -import org.apache.streams.data.DocumentClassifier; - -/** - * Ensures moreover documents can be converted to Activity - */ -public class MoreoverDocumentClassifier implements DocumentClassifier { - @Override - public Class detectClass(Object document) { - Preconditions.checkArgument(document instanceof String); - String string = (String) document; - if( string.startsWith("{") && string.endsWith("}") ) - return Moreover.class; - else if( string.startsWith("<") && string.endsWith(">") ) - return Article.class; - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java deleted file mode 100644 index a3070ae..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover.conversion; - -import com.fasterxml.jackson.databind.AnnotationIntrospector; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; -import com.moreover.api.Article; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.util.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; - -import java.io.IOException; -import java.util.List; - -/** - * Deserializes Moreover JSON format into Activities - */ -public class MoreoverJsonActivityConverter implements ActivityConverter<String> { - - public MoreoverJsonActivityConverter() { - } - - @Override - public String serializationFormat() { - return "application/json+vnd.moreover.com.v1"; - } - - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); - } - - @Override - public Activity deserialize(String serialized) { - serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); - - System.out.println(serialized); - - ObjectMapper mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); - mapper.setAnnotationIntrospector(introspector); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); - - Article article; - try { - ObjectNode node = (ObjectNode)mapper.readTree(serialized); - node.remove("tags"); - node.remove("locations"); - node.remove("companies"); - node.remove("topics"); - node.remove("media"); - node.remove("outboundUrls"); - ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); - jsonNodes.remove("editorialTopics"); - jsonNodes.remove("tags"); - jsonNodes.remove("autoTopics"); - article = mapper.convertValue(node, Article.class); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to deserialize", e); - } - return MoreoverUtils.convert(article); - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - throw new NotImplementedException("Not currently implemented"); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java deleted file mode 100644 index 1f92a9d..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover.conversion; - -import com.moreover.api.Article; -import com.moreover.api.ArticlesResponse; -import com.moreover.api.ObjectFactory; -import org.apache.commons.lang.SerializationException; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.util.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import java.io.StringReader; -import java.util.LinkedList; -import java.util.List; - -/** - * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity} - */ -public class MoreoverXmlActivityConverter implements ActivityConverter<String> { - - //JAXBContext is threadsafe (supposedly) - private final JAXBContext articleContext; - private final JAXBContext articlesContext; - - public MoreoverXmlActivityConverter() { - articleContext = createContext(Article.class); - articlesContext = createContext(ArticlesResponse.class); - } - - @Override - public String serializationFormat() { - return "application/xml+vnd.moreover.com.v1"; - } - - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); - } - - @Override - public Activity deserialize(String serialized) { - Article article = deserializeMoreover(serialized); - return MoreoverUtils.convert(article); - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - List<Activity> activities = new LinkedList<Activity>(); - for(String item : serializedList) { - ArticlesResponse response = deserializeMoreoverResponse(item); - for(Article article : response.getArticles().getArticle()) { - activities.add(MoreoverUtils.convert(article)); - } - } - return activities; - } - - private Article deserializeMoreover(String serialized){ - try { - Unmarshaller unmarshaller = articleContext.createUnmarshaller(); - return (Article) unmarshaller.unmarshal(new StringReader(serialized)); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } - } - - private ArticlesResponse deserializeMoreoverResponse(String serialized){ - try { - Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); - return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } - } - - private JAXBContext createContext(Class articleClass) { - JAXBContext context; - try { - context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); - } catch (JAXBException e) { - throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e); - } - return context; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java deleted file mode 100644 index 01340f6..0000000 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.streams.data.moreover.conversion.MoreoverJsonActivityConverter; -import org.apache.streams.data.util.JsonUtil; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.streams.data.util.MoreoverTestUtil.test; -import static org.junit.Assert.assertThat; - -public class MoreoverJsonActivityConverterTest { - JsonNode json; - ActivityConverter serializer = new MoreoverJsonActivityConverter(); - ObjectMapper mapper; - - @Before - public void setup() throws IOException { - json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json"); - - mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - } - - @Test - public void loadData() throws Exception { - for (JsonNode item : json) { - test(serializer.deserialize(getString(item))); - } - } - - - private String getString(JsonNode jsonNode) { - try { - return new ObjectMapper().writeValueAsString(jsonNode); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java new file mode 100644 index 0000000..f5d66b1 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.data; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.streams.data.util.JsonUtil; +import org.apache.streams.pojo.json.Activity; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.regex.Pattern; + +import static java.util.regex.Pattern.matches; +import static org.apache.streams.data.util.MoreoverTestUtil.test; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + +public class MoreoverJsonActivitySerializerTest { + JsonNode json; + ActivitySerializer serializer = new MoreoverJsonActivitySerializer(); + ObjectMapper mapper; + + @Before + public void setup() throws IOException { + json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json"); + + mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + } + + @Test + public void loadData() throws Exception { + for (JsonNode item : json) { + test(serializer.deserialize(getString(item))); + } + } + + + private String getString(JsonNode jsonNode) { + try { + return new ObjectMapper().writeValueAsString(jsonNode); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java deleted file mode 100644 index e8afc5d..0000000 --- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data; - - -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.apache.streams.data.moreover.conversion.MoreoverXmlActivityConverter; -import org.apache.streams.pojo.json.Activity; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.nio.charset.Charset; -import java.util.List; - -import static org.apache.streams.data.util.MoreoverTestUtil.test; - -public class MoreoverXmlActivityConverterTest { - ActivityConverter serializer; - private String xml; - - @Before - public void setup() throws IOException { - serializer = new MoreoverXmlActivityConverter(); - xml = loadXml(); - } - - @Test - public void loadData() throws Exception { - List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml)); - for (Activity activity : activities) { - test(activity); - } - } - - private String loadXml() throws IOException { - StringWriter writer = new StringWriter(); - InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml"); - IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); - return writer.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java new file mode 100644 index 0000000..dbebee2 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.data; + + +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.streams.pojo.json.Activity; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.util.List; + +import static org.apache.streams.data.util.MoreoverTestUtil.test; + +public class MoreoverXmlActivitySerializerTest { + ActivitySerializer serializer; + private String xml; + + @Before + public void setup() throws IOException { + serializer = new MoreoverXmlActivitySerializer(); + xml = loadXml(); + } + + @Test + public void loadData() throws Exception { + List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml)); + for (Activity activity : activities) { + test(activity); + } + } + + private String loadXml() throws IOException { + StringWriter writer = new StringWriter(); + InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml"); + IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8")); + return writer.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java index fbd16f4..339b922 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java @@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.rss.serializer.SyndEntryActivityConverter; +import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; @@ -32,18 +32,14 @@ import org.slf4j.LoggerFactory; import java.util.List; -@Deprecated /** * Converts ObjectNode representations of Rome SyndEntries to activities. - * Deprecated: Modules and streams should adopt TypeConverterProcessor and ActivityConverterProcessor - * TODO: Have RSS Provider always output ObjectNode, place ActivityConverterProcessor afterward with RssDocumentClassifier and RssConverterResolver available - * TODO: Refactor tests and examples */ public class RssTypeConverter implements StreamsProcessor{ private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class); - private SyndEntryActivityConverter serializer; + private SyndEntryActivitySerializer serializer; private int successCount = 0; private int failCount = 0; @@ -66,7 +62,7 @@ public class RssTypeConverter implements StreamsProcessor{ @Override public void prepare(Object o) { - this.serializer = new SyndEntryActivityConverter(); + this.serializer = new SyndEntryActivitySerializer(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java deleted file mode 100644 index 1e19eb4..0000000 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.rss.provider; - -import com.sun.syndication.feed.synd.SyndEntry; -import org.apache.streams.data.DocumentClassifier; - -/** - * Ensures rss documents can be converted to Activity - */ -public class RssDocumentClassifier implements DocumentClassifier { - - @Override - public Class detectClass(Object document) { - if( document instanceof SyndEntry ) - return SyndEntry.class; - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java new file mode 100644 index 0000000..4e6efee --- /dev/null +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.rss.provider; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.sun.syndication.feed.synd.SyndEntry; + +/** + * Created by sblackmon on 12/13/13. + */ +public class RssEventClassifier { + + public static Class detectClass( ObjectNode bean ) { + return SyndEntry.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java new file mode 100644 index 0000000..75d275d --- /dev/null +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.rss.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.sun.syndication.feed.synd.SyndEntry; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; +import org.apache.streams.rss.serializer.SyndEntrySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; +import java.util.Random; + +public class RssEventProcessor implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(RssEventProcessor.class); + + private ObjectMapper mapper = new ObjectMapper(); + + private Queue<SyndEntry> inQueue; + private Queue<StreamsDatum> outQueue; + + private Class inClass; + private Class outClass; + + private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer(); + private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer(); + + public final static String TERMINATE = new String("TERMINATE"); + + public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.inClass = inClass; + this.outClass = outClass; + } + + public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> outQueue, Class outClass) { + this.inQueue = inQueue; + this.outQueue = outQueue; + this.outClass = outClass; + } + + @Override + public void run() { + + while(true) { + Object item; + try { + item = inQueue.poll(); + if(item instanceof String && item.equals(TERMINATE)) { + LOGGER.info("Terminating!"); + break; + } + + Thread.sleep(new Random().nextInt(100)); + + // if the target is string, just pass-through + if( String.class.equals(outClass)) + outQueue.offer(new StreamsDatum(item.toString())); + else if( SyndEntry.class.equals(outClass)) + { + outQueue.offer(new StreamsDatum(item)); + } + else if( Activity.class.equals(outClass)) + { + // convert to desired format + SyndEntry entry = (SyndEntry)item; + if( entry != null ) { + Activity out = syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item)); + + if( out != null ) + outQueue.offer(new StreamsDatum(out)); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java deleted file mode 100644 index 489ffec..0000000 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.streams.rss.serializer; - -import com.sun.syndication.feed.synd.SyndEntry; -import org.apache.streams.data.ActivityConverterResolver; -import org.apache.streams.exceptions.ActivitySerializerException; - -/** - * Ensures rss documents can be converted to Activity - */ -public class RssConverterResolver implements ActivityConverterResolver { - - @Override - public Class bestSerializer(Class documentClass) throws ActivitySerializerException { - if( documentClass == SyndEntry.class ) - return SyndEntryActivityConverter.class; - return null; - } -}
