level up moreover provider add main methods to each Provider (STREAMS-412) add real integration tests (STREAMS-415) reorganize packages
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/015f0ae1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/015f0ae1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/015f0ae1 Branch: refs/heads/master Commit: 015f0ae1eb8b662889e0fa544607c28e25b0dc38 Parents: 11e3a0f Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 21 11:04:47 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 21 11:04:47 2016 -0500 ---------------------------------------------------------------------- .../data/MoreoverJsonActivitySerializer.java | 97 --------- .../data/MoreoverXmlActivitySerializer.java | 105 ---------- .../streams/data/moreover/MoreoverClient.java | 110 ---------- .../data/moreover/MoreoverConfigurator.java | 58 ------ .../streams/data/moreover/MoreoverProvider.java | 114 ----------- .../data/moreover/MoreoverProviderTask.java | 94 --------- .../streams/data/moreover/MoreoverResult.java | 204 ------------------- .../data/moreover/MoreoverResultSetWrapper.java | 34 ---- .../apache/streams/data/util/MoreoverUtils.java | 155 -------------- .../apache/streams/moreover/MoreoverClient.java | 110 ++++++++++ .../streams/moreover/MoreoverConfigurator.java | 56 +++++ .../MoreoverJsonActivitySerializer.java | 98 +++++++++ .../streams/moreover/MoreoverProvider.java | 177 ++++++++++++++++ .../streams/moreover/MoreoverProviderTask.java | 91 +++++++++ .../apache/streams/moreover/MoreoverResult.java | 200 ++++++++++++++++++ .../moreover/MoreoverResultSetWrapper.java | 32 +++ .../apache/streams/moreover/MoreoverUtils.java | 155 ++++++++++++++ .../moreover/MoreoverXmlActivitySerializer.java | 106 ++++++++++ .../src/main/resources/moreover.conf | 25 --- .../data/MoreoverJsonActivitySerializerIT.java | 79 ------- .../data/MoreoverXmlActivitySerializerIT.java | 65 ------ .../streams/data/util/MoreoverTestUtil.java | 43 ---- .../streams/moreover/MoreoverTestUtil.java | 43 ++++ .../test/MoreoverJsonActivitySerializerIT.java | 76 +++++++ .../test/MoreoverXmlActivitySerializerIT.java | 66 ++++++ .../test/provider/MoreoverProviderIT.java | 67 ++++++ .../src/test/resources/moreover.conf | 25 +++ 27 files changed, 1302 insertions(+), 1183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java deleted file mode 100644 index ae48b41..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data; - -import com.fasterxml.jackson.databind.AnnotationIntrospector; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; -import com.moreover.api.Article; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.util.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * Deserializes Moreover JSON format into Activities - */ -public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> { - - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class); - - public MoreoverJsonActivitySerializer() { - } - - @Override - public String serializationFormat() { - return "application/json+vnd.moreover.com.v1"; - } - - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); - } - - @Override - public Activity deserialize(String serialized) { - serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); - - LOGGER.debug(serialized); - - ObjectMapper mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); - mapper.setAnnotationIntrospector(introspector); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); - - Article article; - try { - ObjectNode node = (ObjectNode)mapper.readTree(serialized); - node.remove("tags"); - node.remove("locations"); - node.remove("companies"); - node.remove("topics"); - node.remove("media"); - node.remove("outboundUrls"); - ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); - jsonNodes.remove("editorialTopics"); - jsonNodes.remove("tags"); - jsonNodes.remove("autoTopics"); - article = mapper.convertValue(node, Article.class); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to deserialize", e); - } - return MoreoverUtils.convert(article); - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - throw new NotImplementedException("Not currently implemented"); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java deleted file mode 100644 index d60bcb8..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data; - -import com.moreover.api.Article; -import com.moreover.api.ArticlesResponse; -import com.moreover.api.ObjectFactory; -import org.apache.commons.lang.SerializationException; -import org.apache.streams.data.util.MoreoverUtils; -import org.apache.streams.pojo.json.Activity; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import java.io.StringReader; -import java.util.LinkedList; -import java.util.List; - -/** - * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity} - */ -public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> { - - //JAXBContext is threadsafe (supposedly) - private final JAXBContext articleContext; - private final JAXBContext articlesContext; - - public MoreoverXmlActivitySerializer() { - articleContext = createContext(Article.class); - articlesContext = createContext(ArticlesResponse.class); - } - - @Override - public String serializationFormat() { - return "application/xml+vnd.moreover.com.v1"; - } - - @Override - public String serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); - } - - @Override - public Activity deserialize(String serialized) { - Article article = deserializeMoreover(serialized); - return MoreoverUtils.convert(article); - } - - @Override - public List<Activity> deserializeAll(List<String> serializedList) { - List<Activity> activities = new LinkedList<Activity>(); - for(String item : serializedList) { - ArticlesResponse response = deserializeMoreoverResponse(item); - for(Article article : response.getArticles().getArticle()) { - activities.add(MoreoverUtils.convert(article)); - } - } - return activities; - } - - private Article deserializeMoreover(String serialized){ - try { - Unmarshaller unmarshaller = articleContext.createUnmarshaller(); - return (Article) unmarshaller.unmarshal(new StringReader(serialized)); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } - } - - private ArticlesResponse deserializeMoreoverResponse(String serialized){ - try { - Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); - return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); - } catch (JAXBException e) { - throw new SerializationException("Unable to deserialize Moreover data", e); - } - } - - private JAXBContext createContext(Class articleClass) { - JAXBContext context; - try { - context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); - } catch (JAXBException e) { - throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e); - } - return context; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java deleted file mode 100644 index 9ab60c5..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.StringWriter; -import java.math.BigInteger; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.Charset; -import java.util.Date; - -/** - * - */ -public class MoreoverClient { - private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class); - - private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s"; - private final String id; - private String apiKey; - private BigInteger lastSequenceId = BigInteger.ZERO; - //testing purpose only - public long pullTime; - private boolean debug; - - public MoreoverClient(String id, String apiKey, String sequence) { - logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence); - this.id = id; - this.apiKey = apiKey; - this.lastSequenceId = new BigInteger(sequence); - } - - public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException { - String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId); - logger.debug("Making call to {}", urlString); - long start = System.nanoTime(); - MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime()); - if(!result.getMaxSequencedId().equals(BigInteger.ZERO)) - { - this.lastSequenceId = result.getMaxSequencedId(); - logger.debug("Maximum sequence from last call {}", this.lastSequenceId); - } - else - logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId); - return result; - } - - public MoreoverResult getNextBatch() throws IOException{ - logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId); - return getArticlesAfter(this.lastSequenceId.toString(), 500); - } - - private String getArticles2(URL url) throws IOException { - HttpURLConnection cn = (HttpURLConnection) url.openConnection(); - cn.setRequestMethod("GET"); - cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); - cn.setDoInput(true); - cn.setDoOutput(false); - BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8"))); - String line = null; - StringBuilder builder = new StringBuilder(); - String s = ""; - String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8")); - while((line = reader.readLine()) != null) { - result+=line; - } - pullTime = new Date().getTime(); - return result; - } - - private String getArticles(URL url) throws IOException{ - HttpURLConnection cn = (HttpURLConnection) url.openConnection(); - cn.setRequestMethod("GET"); - cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); - cn.setDoInput(true); - cn.setDoOutput(false); - StringWriter writer = new StringWriter(); - IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer); - writer.flush(); - pullTime = new Date().getTime(); - - // added after seeing java.net.SocketException: Too many open files - cn.disconnect(); - - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java deleted file mode 100644 index 4c3ba06..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import org.apache.streams.moreover.MoreoverConfiguration; -import org.apache.streams.moreover.MoreoverKeyData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Created by sblackmon on 12/10/13. - */ -public class MoreoverConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class); - - public static MoreoverConfiguration detectConfiguration(Config moreover) { - - MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration(); - - List<MoreoverKeyData> apiKeys = Lists.newArrayList(); - - Config apiKeysConfig = moreover.getConfig("apiKeys"); - - if( !apiKeysConfig.isEmpty()) - for( String apiKeyId : apiKeysConfig.root().keySet() ) { - Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId); - apiKeys.add(new MoreoverKeyData() - .withId(apiKeyConfig.getString("key")) - .withKey(apiKeyConfig.getString("key")) - .withStartingSequence(apiKeyConfig.getString("startingSequence"))); - } - moreoverConfiguration.setApiKeys(apiKeys); - - return moreoverConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java deleted file mode 100644 index 1add4d2..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import com.google.common.base.Predicates; -import com.google.common.collect.*; -import net.jcip.annotations.Immutable; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.moreover.MoreoverConfiguration; -import org.apache.streams.moreover.MoreoverKeyData; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; - -public class MoreoverProvider implements StreamsProvider { - - public final static String STREAMS_ID = "MoreoverProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class); - - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - - private List<MoreoverKeyData> keys; - - private MoreoverConfiguration config; - - private ExecutorService executor; - - public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) { - this.config = moreoverConfiguration; - this.keys = Lists.newArrayList(); - for( MoreoverKeyData apiKey : config.getApiKeys()) { - this.keys.add(apiKey); - } - } - - @Override - public String getId() { - return STREAMS_ID; - } - - public void startStream() { - - for(MoreoverKeyData key : keys) { - MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence()); - executor.submit(new Thread(task)); - LOGGER.info("Started producer for {}", key.getKey()); - } - - } - - @Override - public synchronized StreamsResultSet readCurrent() { - - LOGGER.debug("readCurrent: {}", providerQueue.size()); - - Collection<StreamsDatum> currentIterator = Lists.newArrayList(); - Iterators.addAll(currentIterator, providerQueue.iterator()); - - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); - - providerQueue.clear(); - - return current; - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return !executor.isShutdown() && !executor.isTerminated(); - } - - @Override - public void prepare(Object configurationObject) { - LOGGER.debug("Prepare"); - executor = Executors.newSingleThreadExecutor(); - } - - @Override - public void cleanUp() { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java deleted file mode 100644 index 2640bfd..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigInteger; -import java.util.Queue; - -/** - * Task to pull from the Morever API - */ -public class MoreoverProviderTask implements Runnable { - - public static final int LATENCY = 10; - public static final int REQUIRED_LATENCY = LATENCY * 1000; - private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class); - - private String lastSequence; - private final String apiKey; - private final String apiId; - private final Queue<StreamsDatum> results; - private final MoreoverClient moClient; - private boolean started = false; - - public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) { - //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence); - this.apiId = apiId; - this.apiKey = apiKey; - this.results = results; - this.lastSequence = lastSequence; - this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence); - initializeClient(moClient); - } - - @Override - public void run() { - while(true) { - try { - ensureTime(moClient); - MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500); - started = true; - lastSequence = result.process().toString(); - for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator())) - results.offer(entry); - logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence); - - } catch (Exception e) { - logger.error("Exception while polling moreover", e); - } - } - } - - private void ensureTime(MoreoverClient moClient) { - try { - long gap = System.currentTimeMillis() - moClient.pullTime; - if (gap < REQUIRED_LATENCY) - Thread.sleep(REQUIRED_LATENCY - gap); - } catch (Exception e) { - logger.warn("Error sleeping for latency"); - } - } - - private void initializeClient(MoreoverClient moClient) { - try { - moClient.getArticlesAfter(this.lastSequence, 2); - } catch (Exception e) { - logger.error("Failed to start stream, {}", this.apiKey); - logger.error("Exception : ", e); - throw new IllegalStateException("Unable to initialize stream", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java deleted file mode 100644 index 050026b..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import com.fasterxml.aalto.stax.InputFactoryImpl; -import com.fasterxml.aalto.stax.OutputFactoryImpl; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule; -import com.fasterxml.jackson.dataformat.xml.XmlFactory; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.moreover.api.Article; -import com.moreover.api.ArticlesResponse; -import org.apache.streams.core.StreamsDatum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Iterator; -import java.util.List; - - -public class MoreoverResult implements Iterable<StreamsDatum> { - - private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class); - - private ObjectMapper mapper; - private XmlMapper xmlMapper; - - private String xmlString; - private String jsonString; - private ArticlesResponse resultObject; - private ArticlesResponse.Articles articles; - private List<Article> articleArray; - private long start; - private long end; - private String clientId; - private BigInteger maxSequencedId = BigInteger.ZERO; - - protected ArticlesResponse response; - protected List<StreamsDatum> list = Lists.newArrayList(); - - protected MoreoverResult(String clientId, String xmlString, long start, long end) { - this.xmlString = xmlString; - this.clientId = clientId; - this.start = start; - this.end = end; - XmlFactory f = new XmlFactory(new InputFactoryImpl(), - new OutputFactoryImpl()); - - JacksonXmlModule module = new JacksonXmlModule(); - - module.setDefaultUseWrapper(false); - - xmlMapper = new XmlMapper(f, module); - - xmlMapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, - Boolean.FALSE); - - mapper = new ObjectMapper(); - - mapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - mapper.configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - mapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - mapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); - - } - - public String getClientId() { - return clientId; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - public BigInteger process() { - - try { - this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class); - } catch (JsonMappingException e) { - // theory is this may not be fatal - this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom(); - } catch (Exception e) { - e.printStackTrace(); - logger.warn("Unable to process document:"); - logger.warn(xmlString); - } - - if( this.resultObject.getStatus().equals("FAILURE")) - { - logger.warn(this.resultObject.getStatus()); - logger.warn(this.resultObject.getMessageCode()); - logger.warn(this.resultObject.getUserMessage()); - logger.warn(this.resultObject.getDeveloperMessage()); - } - else - { - this.articles = resultObject.getArticles(); - this.articleArray = articles.getArticle(); - - for (Article article : articleArray) { - BigInteger sequenceid = new BigInteger(article.getSequenceId()); - list.add(new StreamsDatum(article, sequenceid)); - logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); - if (sequenceid.compareTo(this.maxSequencedId) > 0) { - this.maxSequencedId = sequenceid; - } - } - } - - return this.maxSequencedId; - } - - public String getXmlString() { - return this.xmlString; - } - - public BigInteger getMaxSequencedId() { - return this.maxSequencedId; - } - - @Override - public Iterator<StreamsDatum> iterator() { - return list.iterator(); - } - - protected static class JsonStringIterator implements Iterator<Serializable> { - - private Iterator<Serializable> underlying; - - protected JsonStringIterator(Iterator<Serializable> underlying) { - this.underlying = underlying; - } - - @Override - public boolean hasNext() { - return underlying.hasNext(); - } - - @Override - public String next() { - return underlying.next().toString(); - } - - @Override - public void remove() { - underlying.remove(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java deleted file mode 100644 index b0f95a4..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; - -import java.math.BigInteger; -import java.util.Iterator; -import java.util.Queue; - -public class MoreoverResultSetWrapper extends StreamsResultSet { - - public MoreoverResultSetWrapper(MoreoverResult underlying) { - super((Queue<StreamsDatum>)underlying); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java deleted file mode 100644 index 0424875..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.util; - -import com.moreover.api.*; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Actor; -import org.apache.streams.pojo.json.Provider; -import org.joda.time.DateTime; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static org.apache.streams.data.util.ActivityUtil.*; - -/** - * Provides utilities for Moroever data - */ -public class MoreoverUtils { - private MoreoverUtils() { - } - - public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; - - public static Activity convert(Article article) { - Activity activity = new Activity(); - Source source = article.getSource(); - activity.setActor(convert(article.getAuthor(), source.getName())); - activity.setProvider(convert(source)); - activity.setTarget(convertTarget(source)); - activity.setObject(convertObject(article)); - activity.setPublished(DateTime.parse(article.getPublishedDate())); - activity.setContent(article.getContent()); - activity.setTitle(article.getTitle()); - activity.setVerb("posted"); - fixActivityId(activity); - addLocationExtension(activity, source); - addLanguageExtension(activity, article); - activity.setLinks(convertLinks(article)); - return activity; - } - - private static void fixActivityId(Activity activity) { - if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) { - activity.setId(null); - } - } - - private static List convertLinks(Article article) { - List list = new LinkedList(); - Article.OutboundUrls outboundUrls = article.getOutboundUrls(); - if (outboundUrls != null) { - for (String url : outboundUrls.getOutboundUrl()) { - list.add(url); - } - } - return list; - } - - public static ActivityObject convertTarget(Source source) { - ActivityObject object = new ActivityObject(); - object.setUrl(source.getHomeUrl()); - object.setDisplayName(source.getName()); - return object; - } - - public static ActivityObject convertObject(Article article) { - ActivityObject object = new ActivityObject(); - object.setContent(article.getContent()); - object.setSummary(article.getTitle()); - object.setUrl(article.getOriginalUrl()); - object.setObjectType(article.getDataFormat()); - String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat(); - object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId())); - object.setPublished(DateTime.parse(article.getPublishedDate())); - return object; - } - - public static Provider convert(Source source) { - Provider provider = new Provider(); - Feed feed = source.getFeed(); - String display = getProviderID(feed); - provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_"))); - provider.setDisplayName(display); - provider.setUrl(feed.getUrl()); - return provider; - } - - public static Actor convert(Author author, String platformName) { - Actor actor = new Actor(); - AuthorPublishingPlatform platform = author.getPublishingPlatform(); - String userId = platform.getUserId(); - if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId)); - actor.setDisplayName(author.getName()); - actor.setUrl(author.getHomeUrl()); - actor.setSummary(author.getDescription()); - actor.setAdditionalProperty("email", author.getEmail()); - return actor; - } - - public static void addLocationExtension(Activity activity, Source value) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode(); - if (country != null) { - Map<String, Object> location = new HashMap<String, Object>(); - location.put(LOCATION_EXTENSION_COUNTRY, country); - extensions.put(LOCATION_EXTENSION, location); - } - } - - public static void addLanguageExtension(Activity activity, Article value) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - String language = value.getLanguage(); - if (language != null) { - extensions.put(LANGUAGE_EXTENSION, language); - } - } - - public static Date parse(String str) { - DateFormat fmt = new SimpleDateFormat(DATE_FORMAT); - try { - return fmt.parse(str); - } catch (ParseException e) { - throw new IllegalArgumentException("Invalid date format", e); - } - } - - private static String getProviderID(Feed feed) { - return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform()); - } - - private static String getProviderID(String feed) { - return feed.toLowerCase().replace(" ", "_").trim(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java new file mode 100644 index 0000000..05e6120 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.math.BigInteger; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.Date; + +/** + * + */ +public class MoreoverClient { + private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class); + + private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s"; + private final String id; + private String apiKey; + private BigInteger lastSequenceId = BigInteger.ZERO; + //testing purpose only + public long pullTime; + private boolean debug; + + public MoreoverClient(String id, String apiKey, String sequence) { + logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence); + this.id = id; + this.apiKey = apiKey; + this.lastSequenceId = new BigInteger(sequence); + } + + public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException { + String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId); + logger.debug("Making call to {}", urlString); + long start = System.nanoTime(); + MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime()); + if(!result.getMaxSequencedId().equals(BigInteger.ZERO)) + { + this.lastSequenceId = result.getMaxSequencedId(); + logger.debug("Maximum sequence from last call {}", this.lastSequenceId); + } + else + logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId); + return result; + } + + public MoreoverResult getNextBatch() throws IOException{ + logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId); + return getArticlesAfter(this.lastSequenceId.toString(), 500); + } + + private String getArticles2(URL url) throws IOException { + HttpURLConnection cn = (HttpURLConnection) url.openConnection(); + cn.setRequestMethod("GET"); + cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); + cn.setDoInput(true); + cn.setDoOutput(false); + BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8"))); + String line = null; + StringBuilder builder = new StringBuilder(); + String s = ""; + String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8")); + while((line = reader.readLine()) != null) { + result+=line; + } + pullTime = new Date().getTime(); + return result; + } + + private String getArticles(URL url) throws IOException{ + HttpURLConnection cn = (HttpURLConnection) url.openConnection(); + cn.setRequestMethod("GET"); + cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); + cn.setDoInput(true); + cn.setDoOutput(false); + StringWriter writer = new StringWriter(); + IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer); + writer.flush(); + pullTime = new Date().getTime(); + + // added after seeing java.net.SocketException: Too many open files + cn.disconnect(); + + return writer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java new file mode 100644 index 0000000..8d89582 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Created by sblackmon on 12/10/13. + */ +public class MoreoverConfigurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class); + + public static MoreoverConfiguration detectConfiguration(Config moreover) { + + MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration(); + + List<MoreoverKeyData> apiKeys = Lists.newArrayList(); + + Config apiKeysConfig = moreover.getConfig("apiKeys"); + + if( !apiKeysConfig.isEmpty()) + for( String apiKeyId : apiKeysConfig.root().keySet() ) { + Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId); + apiKeys.add(new MoreoverKeyData() + .withId(apiKeyConfig.getString("key")) + .withKey(apiKeyConfig.getString("key")) + .withStartingSequence(apiKeyConfig.getString("startingSequence"))); + } + moreoverConfiguration.setApiKeys(apiKeys); + + return moreoverConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java new file mode 100644 index 0000000..17fde37 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.fasterxml.jackson.databind.AnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.moreover.api.Article; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.moreover.MoreoverUtils; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Deserializes Moreover JSON format into Activities + */ +public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> { + + private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class); + + public MoreoverJsonActivitySerializer() { + } + + @Override + public String serializationFormat() { + return "application/json+vnd.moreover.com.v1"; + } + + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON"); + } + + @Override + public Activity deserialize(String serialized) { + serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); + + LOGGER.debug(serialized); + + ObjectMapper mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); + mapper.setAnnotationIntrospector(introspector); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); + + Article article; + try { + ObjectNode node = (ObjectNode)mapper.readTree(serialized); + node.remove("tags"); + node.remove("locations"); + node.remove("companies"); + node.remove("topics"); + node.remove("media"); + node.remove("outboundUrls"); + ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed"); + jsonNodes.remove("editorialTopics"); + jsonNodes.remove("tags"); + jsonNodes.remove("autoTopics"); + article = mapper.convertValue(node, Article.class); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to deserialize", e); + } + return MoreoverUtils.convert(article); + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java new file mode 100644 index 0000000..78d8e9d --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.*; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.math.BigInteger; +import java.util.*; +import java.util.concurrent.*; + +/** + * Streams Provider for the Moreover Metabase API + * + * To use from command line: + * + * Supply configuration similar to src/test/resources/rss.conf + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider -Dexec.args="rss.conf articles.json" + */ +public class MoreoverProvider implements StreamsProvider { + + public final static String STREAMS_ID = "MoreoverProvider"; + + private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class); + + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + + private List<MoreoverKeyData> keys; + + private MoreoverConfiguration config; + + private ExecutorService executor; + + public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) { + this.config = moreoverConfiguration; + this.keys = Lists.newArrayList(); + for( MoreoverKeyData apiKey : config.getApiKeys()) { + this.keys.add(apiKey); + } + } + + @Override + public String getId() { + return STREAMS_ID; + } + + public void startStream() { + + for(MoreoverKeyData key : keys) { + MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence()); + executor.submit(new Thread(task)); + LOGGER.info("Started producer for {}", key.getKey()); + } + + } + + @Override + public synchronized StreamsResultSet readCurrent() { + + LOGGER.debug("readCurrent: {}", providerQueue.size()); + + Collection<StreamsDatum> currentIterator = Lists.newArrayList(); + Iterators.addAll(currentIterator, providerQueue.iterator()); + + StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); + + providerQueue.clear(); + + return current; + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !executor.isShutdown() && !executor.isTerminated(); + } + + @Override + public void prepare(Object configurationObject) { + LOGGER.debug("Prepare"); + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void cleanUp() { + + } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + MoreoverConfiguration config = new ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe, "rss"); + MoreoverProvider provider = new MoreoverProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java new file mode 100644 index 0000000..ad92d73 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.google.common.collect.ImmutableSet; +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Queue; + +/** + * Task to pull from the Morever API + */ +public class MoreoverProviderTask implements Runnable { + + public static final int LATENCY = 10; + public static final int REQUIRED_LATENCY = LATENCY * 1000; + private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class); + + private String lastSequence; + private final String apiKey; + private final String apiId; + private final Queue<StreamsDatum> results; + private final MoreoverClient moClient; + private boolean started = false; + + public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) { + //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence); + this.apiId = apiId; + this.apiKey = apiKey; + this.results = results; + this.lastSequence = lastSequence; + this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence); + initializeClient(moClient); + } + + @Override + public void run() { + while(true) { + try { + ensureTime(moClient); + MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500); + started = true; + lastSequence = result.process().toString(); + for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator())) + results.offer(entry); + logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence); + + } catch (Exception e) { + logger.error("Exception while polling moreover", e); + } + } + } + + private void ensureTime(MoreoverClient moClient) { + try { + long gap = System.currentTimeMillis() - moClient.pullTime; + if (gap < REQUIRED_LATENCY) + Thread.sleep(REQUIRED_LATENCY - gap); + } catch (Exception e) { + logger.warn("Error sleeping for latency"); + } + } + + private void initializeClient(MoreoverClient moClient) { + try { + moClient.getArticlesAfter(this.lastSequence, 2); + } catch (Exception e) { + logger.error("Failed to start stream, {}", this.apiKey); + logger.error("Exception : ", e); + throw new IllegalStateException("Unable to initialize stream", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java new file mode 100644 index 0000000..e07084f --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.fasterxml.aalto.stax.InputFactoryImpl; +import com.fasterxml.aalto.stax.OutputFactoryImpl; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule; +import com.fasterxml.jackson.dataformat.xml.XmlFactory; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.google.common.collect.Lists; +import com.moreover.api.Article; +import com.moreover.api.ArticlesResponse; +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Iterator; +import java.util.List; + + +public class MoreoverResult implements Iterable<StreamsDatum> { + + private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class); + + private ObjectMapper mapper; + private XmlMapper xmlMapper; + + private String xmlString; + private String jsonString; + private ArticlesResponse resultObject; + private ArticlesResponse.Articles articles; + private List<Article> articleArray; + private long start; + private long end; + private String clientId; + private BigInteger maxSequencedId = BigInteger.ZERO; + + protected ArticlesResponse response; + protected List<StreamsDatum> list = Lists.newArrayList(); + + protected MoreoverResult(String clientId, String xmlString, long start, long end) { + this.xmlString = xmlString; + this.clientId = clientId; + this.start = start; + this.end = end; + XmlFactory f = new XmlFactory(new InputFactoryImpl(), + new OutputFactoryImpl()); + + JacksonXmlModule module = new JacksonXmlModule(); + + module.setDefaultUseWrapper(false); + + xmlMapper = new XmlMapper(f, module); + + xmlMapper + .configure( + DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, + Boolean.TRUE); + xmlMapper.configure( + DeserializationFeature.READ_ENUMS_USING_TO_STRING, + Boolean.TRUE); + xmlMapper.configure( + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + Boolean.FALSE); + + mapper = new ObjectMapper(); + + mapper + .configure( + DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + Boolean.TRUE); + mapper.configure( + DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, + Boolean.TRUE); + mapper + .configure( + DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, + Boolean.TRUE); + mapper.configure( + DeserializationFeature.READ_ENUMS_USING_TO_STRING, + Boolean.TRUE); + + } + + public String getClientId() { + return clientId; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public BigInteger process() { + + try { + this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class); + } catch (JsonMappingException e) { + // theory is this may not be fatal + this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom(); + } catch (Exception e) { + e.printStackTrace(); + logger.warn("Unable to process document:"); + logger.warn(xmlString); + } + + if( this.resultObject.getStatus().equals("FAILURE")) + { + logger.warn(this.resultObject.getStatus()); + logger.warn(this.resultObject.getMessageCode()); + logger.warn(this.resultObject.getUserMessage()); + logger.warn(this.resultObject.getDeveloperMessage()); + } + else + { + this.articles = resultObject.getArticles(); + this.articleArray = articles.getArticle(); + + for (Article article : articleArray) { + BigInteger sequenceid = new BigInteger(article.getSequenceId()); + list.add(new StreamsDatum(article, sequenceid)); + logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid); + if (sequenceid.compareTo(this.maxSequencedId) > 0) { + this.maxSequencedId = sequenceid; + } + } + } + + return this.maxSequencedId; + } + + public String getXmlString() { + return this.xmlString; + } + + public BigInteger getMaxSequencedId() { + return this.maxSequencedId; + } + + @Override + public Iterator<StreamsDatum> iterator() { + return list.iterator(); + } + + protected static class JsonStringIterator implements Iterator<Serializable> { + + private Iterator<Serializable> underlying; + + protected JsonStringIterator(Iterator<Serializable> underlying) { + this.underlying = underlying; + } + + @Override + public boolean hasNext() { + return underlying.hasNext(); + } + + @Override + public String next() { + return underlying.next().toString(); + } + + @Override + public void remove() { + underlying.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java new file mode 100644 index 0000000..0a47bd1 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; + +import java.util.Queue; + +public class MoreoverResultSetWrapper extends StreamsResultSet { + + public MoreoverResultSetWrapper(MoreoverResult underlying) { + super((Queue<StreamsDatum>)underlying); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java new file mode 100644 index 0000000..45d26df --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.moreover.api.*; +import org.apache.streams.pojo.extensions.ExtensionUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Actor; +import org.apache.streams.pojo.json.Provider; +import org.joda.time.DateTime; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.apache.streams.data.util.ActivityUtil.*; + +/** + * Provides utilities for Moroever data + */ +public class MoreoverUtils { + private MoreoverUtils() { + } + + public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + public static Activity convert(Article article) { + Activity activity = new Activity(); + Source source = article.getSource(); + activity.setActor(convert(article.getAuthor(), source.getName())); + activity.setProvider(convert(source)); + activity.setTarget(convertTarget(source)); + activity.setObject(convertObject(article)); + activity.setPublished(DateTime.parse(article.getPublishedDate())); + activity.setContent(article.getContent()); + activity.setTitle(article.getTitle()); + activity.setVerb("posted"); + fixActivityId(activity); + addLocationExtension(activity, source); + addLanguageExtension(activity, article); + activity.setLinks(convertLinks(article)); + return activity; + } + + private static void fixActivityId(Activity activity) { + if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) { + activity.setId(null); + } + } + + private static List convertLinks(Article article) { + List list = new LinkedList(); + Article.OutboundUrls outboundUrls = article.getOutboundUrls(); + if (outboundUrls != null) { + for (String url : outboundUrls.getOutboundUrl()) { + list.add(url); + } + } + return list; + } + + public static ActivityObject convertTarget(Source source) { + ActivityObject object = new ActivityObject(); + object.setUrl(source.getHomeUrl()); + object.setDisplayName(source.getName()); + return object; + } + + public static ActivityObject convertObject(Article article) { + ActivityObject object = new ActivityObject(); + object.setContent(article.getContent()); + object.setSummary(article.getTitle()); + object.setUrl(article.getOriginalUrl()); + object.setObjectType(article.getDataFormat()); + String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat(); + object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId())); + object.setPublished(DateTime.parse(article.getPublishedDate())); + return object; + } + + public static Provider convert(Source source) { + Provider provider = new Provider(); + Feed feed = source.getFeed(); + String display = getProviderID(feed); + provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_"))); + provider.setDisplayName(display); + provider.setUrl(feed.getUrl()); + return provider; + } + + public static Actor convert(Author author, String platformName) { + Actor actor = new Actor(); + AuthorPublishingPlatform platform = author.getPublishingPlatform(); + String userId = platform.getUserId(); + if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId)); + actor.setDisplayName(author.getName()); + actor.setUrl(author.getHomeUrl()); + actor.setSummary(author.getDescription()); + actor.setAdditionalProperty("email", author.getEmail()); + return actor; + } + + public static void addLocationExtension(Activity activity, Source value) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode(); + if (country != null) { + Map<String, Object> location = new HashMap<String, Object>(); + location.put(LOCATION_EXTENSION_COUNTRY, country); + extensions.put(LOCATION_EXTENSION, location); + } + } + + public static void addLanguageExtension(Activity activity, Article value) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + String language = value.getLanguage(); + if (language != null) { + extensions.put(LANGUAGE_EXTENSION, language); + } + } + + public static Date parse(String str) { + DateFormat fmt = new SimpleDateFormat(DATE_FORMAT); + try { + return fmt.parse(str); + } catch (ParseException e) { + throw new IllegalArgumentException("Invalid date format", e); + } + } + + private static String getProviderID(Feed feed) { + return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform()); + } + + private static String getProviderID(String feed) { + return feed.toLowerCase().replace(" ", "_").trim(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java new file mode 100644 index 0000000..4b7b3b0 --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.moreover; + +import com.moreover.api.Article; +import com.moreover.api.ArticlesResponse; +import com.moreover.api.ObjectFactory; +import org.apache.commons.lang.SerializationException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.moreover.MoreoverUtils; +import org.apache.streams.pojo.json.Activity; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import java.io.StringReader; +import java.util.LinkedList; +import java.util.List; + +/** + * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity} + */ +public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> { + + //JAXBContext is threadsafe (supposedly) + private final JAXBContext articleContext; + private final JAXBContext articlesContext; + + public MoreoverXmlActivitySerializer() { + articleContext = createContext(Article.class); + articlesContext = createContext(ArticlesResponse.class); + } + + @Override + public String serializationFormat() { + return "application/xml+vnd.moreover.com.v1"; + } + + @Override + public String serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Moreover"); + } + + @Override + public Activity deserialize(String serialized) { + Article article = deserializeMoreover(serialized); + return MoreoverUtils.convert(article); + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + List<Activity> activities = new LinkedList<Activity>(); + for(String item : serializedList) { + ArticlesResponse response = deserializeMoreoverResponse(item); + for(Article article : response.getArticles().getArticle()) { + activities.add(MoreoverUtils.convert(article)); + } + } + return activities; + } + + private Article deserializeMoreover(String serialized){ + try { + Unmarshaller unmarshaller = articleContext.createUnmarshaller(); + return (Article) unmarshaller.unmarshal(new StringReader(serialized)); + } catch (JAXBException e) { + throw new SerializationException("Unable to deserialize Moreover data", e); + } + } + + private ArticlesResponse deserializeMoreoverResponse(String serialized){ + try { + Unmarshaller unmarshaller = articlesContext.createUnmarshaller(); + return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue(); + } catch (JAXBException e) { + throw new SerializationException("Unable to deserialize Moreover data", e); + } + } + + private JAXBContext createContext(Class articleClass) { + JAXBContext context; + try { + context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader()); + } catch (JAXBException e) { + throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e); + } + return context; + } +}
