STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b71cce83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b71cce83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b71cce83 Branch: refs/heads/master Commit: b71cce83de01f9c07db59e0af9915e75366c86db Parents: 571d406 Author: smarthi <[email protected]> Authored: Tue Feb 7 23:31:32 2017 -0500 Committer: smarthi <[email protected]> Committed: Tue Feb 7 23:31:32 2017 -0500 ---------------------------------------------------------------------- README.md | 2 +- src/site/markdown/release.md | 4 +- .../streams-persist-cassandra/pom.xml | 5 - streams-contrib/streams-persist-graph/pom.xml | 7 - .../hdfs/test/HdfsPersistConfigTest.java | 243 ++++++------- .../streams/hdfs/test/TestHdfsPersist.java | 111 +++--- streams-contrib/streams-persist-mongo/pom.xml | 5 - streams-contrib/streams-persist-neo4j/pom.xml | 5 - .../gmail/provider/GMailImapProviderTask.java | 62 ---- .../GMailMessageActivitySerializer.java | 189 ---------- .../google/gmail/provider/GMailProvider.java | 191 ---------- .../gmail/provider/GMailImapProviderTask.java | 62 ++++ .../GMailMessageActivitySerializer.java | 193 ++++++++++ .../streams/gmail/provider/GMailProvider.java | 183 ++++++++++ .../com/google/gmail/GMailConfiguration.json | 17 - .../google/gmail/GMailConfiguration.json | 17 + .../gmail/test/GMailMessageSerDeTest.java | 67 ---- .../gmail/test/GMailMessageSerDeTest.java | 67 ++++ .../processor/GooglePlusCommentProcessor.java | 98 ----- .../processor/GooglePlusTypeConverter.java | 140 -------- .../gplus/provider/AbstractGPlusProvider.java | 260 -------------- .../gplus/provider/GPlusActivitySerializer.java | 70 ---- .../gplus/provider/GPlusDataCollector.java | 68 ---- .../provider/GPlusUserActivityCollector.java | 152 -------- .../provider/GPlusUserActivityProvider.java | 131 ------- .../gplus/provider/GPlusUserDataCollector.java | 106 ------ .../gplus/provider/GPlusUserDataProvider.java | 134 ------- .../util/GPlusActivityDeserializer.java | 172 --------- .../util/GPlusCommentDeserializer.java | 104 ------ .../serializer/util/GPlusEventClassifier.java | 68 ---- .../util/GPlusPersonDeserializer.java | 122 ------- .../serializer/util/GooglePlusActivityUtil.java | 298 ---------------- .../processor/GooglePlusCommentProcessor.java | 96 +++++ .../processor/GooglePlusTypeConverter.java | 140 ++++++++ .../gplus/provider/AbstractGPlusProvider.java | 260 ++++++++++++++ .../gplus/provider/GPlusActivitySerializer.java | 70 ++++ .../gplus/provider/GPlusDataCollector.java | 68 ++++ .../provider/GPlusUserActivityCollector.java | 152 ++++++++ .../provider/GPlusUserActivityProvider.java | 131 +++++++ .../gplus/provider/GPlusUserDataCollector.java | 106 ++++++ .../gplus/provider/GPlusUserDataProvider.java | 133 +++++++ .../util/GPlusActivityDeserializer.java | 172 +++++++++ .../util/GPlusCommentDeserializer.java | 103 ++++++ .../serializer/util/GPlusEventClassifier.java | 68 ++++ .../util/GPlusPersonDeserializer.java | 122 +++++++ .../serializer/util/GooglePlusActivityUtil.java | 298 ++++++++++++++++ .../com/google/gplus/GPlusConfiguration.json | 101 ------ .../google/gplus/GPlusConfiguration.json | 101 ++++++ .../google/gplus/GooglePlusCommentSerDeIT.java | 122 ------- .../google/gplus/GooglePlusPersonSerDeIT.java | 106 ------ .../processor/GooglePlusActivitySerDeIT.java | 119 ------- .../provider/TestAbstractGPlusProvider.java | 97 ----- .../TestGPlusUserActivityCollector.java | 308 ---------------- .../provider/TestGPlusUserDataCollector.java | 137 ------- .../util/GPlusEventClassifierTest.java | 75 ---- .../streams/gplus/GooglePlusCommentSerDeIT.java | 116 ++++++ .../streams/gplus/GooglePlusPersonSerDeIT.java | 103 ++++++ .../processors/GooglePlusActivitySerDeIT.java | 119 +++++++ .../processors/GooglePlusTypeConverterIT.java | 132 +++++++ .../providers/GPlusUserActivityProviderIT.java | 67 ++++ .../providers/GPlusUserDataProviderIT.java | 70 ++++ .../providers/TestAbstractGPlusProvider.java | 98 +++++ .../TestGPlusUserActivityCollector.java | 299 ++++++++++++++++ .../providers/TestGPlusUserDataCollector.java | 135 +++++++ .../util/GPlusEventClassifierTest.java | 71 ++++ .../processors/GooglePlusTypeConverterIT.java | 132 ------- .../providers/GPlusUserActivityProviderIT.java | 67 ---- .../test/providers/GPlusUserDataProviderIT.java | 69 ---- .../com/instagram/InstagramConfiguration.json | 71 ---- .../InstagramUserInformationConfiguration.json | 20 -- .../instagram/InstagramConfiguration.json | 71 ++++ .../InstagramUserInformationConfiguration.json | 20 ++ .../main/jsonschema/com/moreover/Moreover.json | 337 ------------------ .../com/moreover/MoreoverConfiguration.json | 38 -- .../org/apache/streams/moreover/Moreover.json | 337 ++++++++++++++++++ .../streams/moreover/MoreoverConfiguration.json | 38 ++ .../perpetual/RssFeedSchedulerTest.java | 20 +- .../sysomos/provider/SysomosProvider.java | 128 +++---- .../main/jsonschema/com/sysomos/Sysomos.json | 61 ---- .../com/sysomos/SysomosConfiguration.json | 43 --- .../org/apache/streams/sysomos/Sysomos.json | 61 ++++ .../streams/sysomos/SysomosConfiguration.json | 43 +++ .../com/sysomos/test/SysomosJsonSerDeIT.java | 68 ---- .../com/sysomos/test/SysomosXmlSerDeIT.java | 102 ------ .../sysomos/test/SysomosJsonSerDeIT.java | 69 ++++ .../streams/sysomos/test/SysomosXmlSerDeIT.java | 102 ++++++ .../src/main/jsonschema/com/twitter/Delete.json | 37 -- .../src/main/jsonschema/com/twitter/Follow.json | 18 - .../main/jsonschema/com/twitter/FriendList.json | 24 -- .../main/jsonschema/com/twitter/Retweet.json | 19 - .../com/twitter/TwitterConfiguration.json | 87 ----- .../twitter/TwitterFollowingConfiguration.json | 33 -- .../com/twitter/TwitterStreamConfiguration.json | 45 --- .../TwitterTimelineProviderConfiguration.json | 23 -- .../TwitterUserInformationConfiguration.json | 25 -- .../src/main/jsonschema/com/twitter/User.json | 129 ------- .../jsonschema/com/twitter/UserstreamEvent.json | 52 --- .../src/main/jsonschema/com/twitter/tweet.json | 219 ------------ .../streams/twitter/TwitterConfiguration.json | 87 +++++ .../twitter/TwitterFollowingConfiguration.json | 33 ++ .../twitter/TwitterStreamConfiguration.json | 45 +++ .../TwitterTimelineProviderConfiguration.json | 23 ++ .../TwitterUserInformationConfiguration.json | 25 ++ .../org/apache/streams/twitter/pojo/Delete.json | 37 ++ .../org/apache/streams/twitter/pojo/Follow.json | 18 + .../apache/streams/twitter/pojo/FriendList.json | 24 ++ .../apache/streams/twitter/pojo/Retweet.json | 19 + .../org/apache/streams/twitter/pojo/User.json | 129 +++++++ .../streams/twitter/pojo/UserstreamEvent.json | 52 +++ .../org/apache/streams/twitter/pojo/tweet.json | 219 ++++++++++++ .../youtube/processor/YoutubeTypeConverter.java | 129 ------- .../provider/YoutubeChannelDataCollector.java | 114 ------ .../provider/YoutubeChannelProvider.java | 132 ------- .../youtube/provider/YoutubeDataCollector.java | 70 ---- .../com/youtube/provider/YoutubeProvider.java | 277 --------------- .../provider/YoutubeUserActivityCollector.java | 229 ------------ .../provider/YoutubeUserActivityProvider.java | 132 ------- .../youtube/serializer/YoutubeActivityUtil.java | 200 ----------- .../serializer/YoutubeChannelDeserializer.java | 152 -------- .../serializer/YoutubeEventClassifier.java | 62 ---- .../serializer/YoutubeVideoDeserializer.java | 118 ------- .../youtube/processor/YoutubeTypeConverter.java | 129 +++++++ .../provider/YoutubeChannelDataCollector.java | 114 ++++++ .../provider/YoutubeChannelProvider.java | 124 +++++++ .../youtube/provider/YoutubeDataCollector.java | 70 ++++ .../youtube/provider/YoutubeProvider.java | 270 ++++++++++++++ .../provider/YoutubeUserActivityCollector.java | 228 ++++++++++++ .../provider/YoutubeUserActivityProvider.java | 132 +++++++ .../youtube/serializer/YoutubeActivityUtil.java | 200 +++++++++++ .../serializer/YoutubeChannelDeserializer.java | 152 ++++++++ .../serializer/YoutubeEventClassifier.java | 62 ++++ .../serializer/YoutubeVideoDeserializer.java | 117 ++++++ .../com/youtube/YoutubeConfiguration.json | 105 ------ .../streams/youtube/YoutubeConfiguration.json | 105 ++++++ .../processor/YoutubeTypeConverterTest.java | 111 ------ .../YoutubeChannelDataCollectorTest.java | 102 ------ .../youtube/provider/YoutubeProviderTest.java | 166 --------- .../YoutubeUserActivityCollectorTest.java | 354 ------------------- .../serializer/YoutubeEventClassifierTest.java | 50 --- .../serializer/YoutubeVideoSerDeTest.java | 123 ------- .../processor/YoutubeTypeConverterTest.java | 111 ++++++ .../YoutubeChannelDataCollectorTest.java | 102 ++++++ .../youtube/provider/YoutubeProviderTest.java | 166 +++++++++ .../YoutubeUserActivityCollectorTest.java | 354 +++++++++++++++++++ .../serializer/YoutubeEventClassifierTest.java | 50 +++ .../serializer/YoutubeVideoSerDeTest.java | 120 +++++++ .../providers/YoutubeChannelProviderIT.java | 3 +- .../YoutubeUserActivityProviderIT.java | 2 +- .../streams-plugin-cassandra/pom.xml | 5 - .../streams-plugin-elasticsearch/pom.xml | 5 - streams-plugins/streams-plugin-hbase/pom.xml | 5 - streams-plugins/streams-plugin-hive/pom.xml | 5 - streams-plugins/streams-plugin-pig/pom.xml | 5 - streams-plugins/streams-plugin-scala/pom.xml | 6 - .../src/test/resources/log4j.properties | 2 +- .../src/test/resources/logback-test.xml | 2 +- 156 files changed, 8060 insertions(+), 8179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index b51df17..1d1a4b2 100644 --- a/README.md +++ b/README.md @@ -19,4 +19,4 @@ Apache Streams is an effort undergoing incubation at [The Apache Software Founda ## License -Copyright (c) 2016. Apache Streams is licensed under the [Apache License 2.0](LICENSE). +Copyright (c) 2017. Apache Streams is licensed under the [Apache License 2.0](LICENSE). http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/src/site/markdown/release.md ---------------------------------------------------------------------- diff --git a/src/site/markdown/release.md b/src/site/markdown/release.md index 7998b4f..44730be 100644 --- a/src/site/markdown/release.md +++ b/src/site/markdown/release.md @@ -24,8 +24,8 @@ As an alternative to releasing separately, the projects MAY be released together export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=256m" - 2. Use the latest Sun 1.7.0 JDK - 3. Use Maven 3.2.1 or later + 2. Use the latest Sun 1.8.x JDK + 3. Use Maven 3.3.9 or later 4. Make sure the [Release Setup](release-setup.html) steps have been performed. 2. Prepare the source for release: http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml index 75785dd..498548a 100644 --- a/streams-contrib/streams-persist-cassandra/pom.xml +++ b/streams-contrib/streams-persist-cassandra/pom.xml @@ -84,11 +84,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-testing</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-graph/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml index 996c706..bda1943 100644 --- a/streams-contrib/streams-persist-graph/pom.xml +++ b/streams-contrib/streams-persist-graph/pom.xml @@ -76,12 +76,6 @@ </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-testing</artifactId> <version>${project.version}</version> @@ -91,7 +85,6 @@ </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> - <testSourceDirectory>src/test/java</testSourceDirectory> <resources> <resource> <directory>src/main/resources</directory> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java index 6e5f351..6d7aaf6 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java @@ -25,152 +25,143 @@ import org.apache.streams.hdfs.HdfsWriterConfiguration; import org.apache.streams.hdfs.WebHdfsPersistReader; import org.apache.streams.hdfs.WebHdfsPersistWriter; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - /** * Test for checking that strings append to FS paths as expected */ public class HdfsPersistConfigTest { - private static final Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class); - - @Test - public void getWriterFileUriTest() - { - HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); - writerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE); - writerConfiguration.setPath("path"); - writerConfiguration.setWriterPath("writerPath"); - writerConfiguration.setUser("cloudera"); - - WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistWriter.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray()); - webHdfsPersistWriter.prepare(null); - assertTrue(webHdfsPersistWriter.isConnected()); - } + private static final Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class); - @Test - public void getWriterHdfsUriTest() - { - HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); - writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); - writerConfiguration.setHost("localhost"); - writerConfiguration.setPort(9000L); - writerConfiguration.setPath("path"); - writerConfiguration.setWriterPath("writerPath"); - writerConfiguration.setUser("cloudera"); - - WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistWriter.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray()); + @Test + public void getWriterFileUriTest() { + HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); + writerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE); + writerConfiguration.setPath("path"); + writerConfiguration.setWriterPath("writerPath"); + writerConfiguration.setUser("cloudera"); - } - - @Test - public void getWriterWebHdfsUriTest() - { - HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); - writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); - writerConfiguration.setHost("localhost"); - writerConfiguration.setPort(57000L); - writerConfiguration.setPath("path"); - writerConfiguration.setWriterPath("writerPath"); - writerConfiguration.setUser("cloudera"); - - WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistWriter.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray()); + WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); + String uri = null; + try { + uri = webHdfsPersistWriter.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); } - - @Test - public void getReaderFileUriTest() - { - HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); - readerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE); - readerConfiguration.setPath("path"); - readerConfiguration.setReaderPath("readerPath"); - - WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistReader.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray()); + Assert.assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray()); + webHdfsPersistWriter.prepare(null); + Assert.assertTrue(webHdfsPersistWriter.isConnected()); + } + + @Test + public void getWriterHdfsUriTest() { + HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); + writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); + writerConfiguration.setHost("localhost"); + writerConfiguration.setPort(9000L); + writerConfiguration.setPath("path"); + writerConfiguration.setWriterPath("writerPath"); + writerConfiguration.setUser("cloudera"); + + WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); + + String uri = null; + try { + uri = webHdfsPersistWriter.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); } + Assert.assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray()); + + } + + @Test + public void getWriterWebHdfsUriTest() { + HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); + writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); + writerConfiguration.setHost("localhost"); + writerConfiguration.setPort(57000L); + writerConfiguration.setPath("path"); + writerConfiguration.setWriterPath("writerPath"); + writerConfiguration.setUser("cloudera"); + + WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration); + + String uri = null; + try { + uri = webHdfsPersistWriter.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); + } + Assert.assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray()); - @Test - public void getReaderHdfsUriTest() - { - HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); - readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); - readerConfiguration.setHost("localhost"); - readerConfiguration.setPort(9000L); - readerConfiguration.setPath("path"); - readerConfiguration.setReaderPath("readerPath"); - - WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistReader.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray()); + } - } + @Test + public void getReaderFileUriTest() { + HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); + readerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE); + readerConfiguration.setPath("path"); + readerConfiguration.setReaderPath("readerPath"); - @Test - public void getReaderWebHdfsUriTest() - { - HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); - readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); - readerConfiguration.setHost("localhost"); - readerConfiguration.setPort(57000L); - readerConfiguration.setPath("path"); - readerConfiguration.setReaderPath("readerPath"); - - WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); - - String uri = null; - try { - uri = webHdfsPersistReader.getURI().toString(); - } catch (URISyntaxException e) { - fail("URI Syntax"); - } - assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray()); + WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); + String uri = null; + try { + uri = webHdfsPersistReader.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); + } + Assert.assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray()); + } + + @Test + public void getReaderHdfsUriTest() { + HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); + readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); + readerConfiguration.setHost("localhost"); + readerConfiguration.setPort(9000L); + readerConfiguration.setPath("path"); + readerConfiguration.setReaderPath("readerPath"); + + WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); + + String uri = null; + try { + uri = webHdfsPersistReader.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); } + Assert.assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray()); + + } + + @Test + public void getReaderWebHdfsUriTest() { + HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); + readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); + readerConfiguration.setHost("localhost"); + readerConfiguration.setPort(57000L); + readerConfiguration.setPath("path"); + readerConfiguration.setReaderPath("readerPath"); + + WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration); + + String uri = null; + try { + uri = webHdfsPersistReader.getURI().toString(); + } catch (URISyntaxException e) { + Assert.fail("URI Syntax"); + } + Assert.assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray()); + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java index 1ee1e39..798a8b8 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java @@ -49,75 +49,78 @@ import java.util.List; */ public class TestHdfsPersist { - private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class); - private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - @Before - public void setup() { - File file = new File("/target/TestHdfsPersist/"); - if( file.exists()) - file.delete(); + @Before + public void setup() { + File file = new File("/target/TestHdfsPersist/"); + if (file.exists()) { + file.delete(); } + } - @Test - public void TestHdfsPersist() throws Exception { + @Test + public void TestHdfsPersist() throws Exception { - List<List<String>> fieldArrays = new ArrayList<>(); - fieldArrays.add(new ArrayList<>()); - fieldArrays.add(Collections.singletonList("ID")); - fieldArrays.add(Arrays.asList("ID", "DOC")); - fieldArrays.add(Arrays.asList("ID", "TS", "DOC")); - fieldArrays.add(Arrays.asList("ID", "TS", "META", "DOC")); - - for( List<String> fields : fieldArrays ) - TestHdfsPersistCase(fields); + List<List<String>> fieldArrays = new ArrayList<>(); + fieldArrays.add(new ArrayList<>()); + fieldArrays.add(Collections.singletonList("ID")); + fieldArrays.add(Arrays.asList("ID", "DOC")); + fieldArrays.add(Arrays.asList("ID", "TS", "DOC")); + fieldArrays.add(Arrays.asList("ID", "TS", "META", "DOC")); + for (List<String> fields : fieldArrays) { + TestHdfsPersistCase(fields); } - public void TestHdfsPersistCase(List<String> fields) throws Exception { - - HdfsConfiguration hdfsConfiguration = new HdfsConfiguration().withScheme(HdfsConfiguration.Scheme.FILE).withHost("localhost").withUser("cloudera").withPath("target/TestHdfsPersist"); - hdfsConfiguration.setFields(fields); - HdfsWriterConfiguration hdfsWriterConfiguration = MAPPER.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class); - if( fields.size() % 2 == 1 ) - hdfsWriterConfiguration.setCompression(HdfsWriterConfiguration.Compression.GZIP); - hdfsWriterConfiguration.setWriterFilePrefix("activities"); - hdfsWriterConfiguration.setWriterPath(Integer.toString(fields.size())); - WebHdfsPersistWriter writer = new WebHdfsPersistWriter(hdfsWriterConfiguration); - - writer.prepare(null); - - InputStream testActivityFolderStream = TestHdfsPersist.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); + } - int count = 0; + public void TestHdfsPersistCase(List<String> fields) throws Exception { - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = TestHdfsPersist.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - activity.getAdditionalProperties().remove("$license"); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - writer.write( datum ); - LOGGER.info("Wrote: " + activity.getVerb() ); - count++; - } + HdfsConfiguration hdfsConfiguration = new HdfsConfiguration().withScheme(HdfsConfiguration.Scheme.FILE).withHost("localhost").withUser("cloudera").withPath("target/TestHdfsPersist"); + hdfsConfiguration.setFields(fields); + HdfsWriterConfiguration hdfsWriterConfiguration = MAPPER.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class); + if (fields.size() % 2 == 1) { + hdfsWriterConfiguration.setCompression(HdfsWriterConfiguration.Compression.GZIP); + } + hdfsWriterConfiguration.setWriterFilePrefix("activities"); + hdfsWriterConfiguration.setWriterPath(Integer.toString(fields.size())); + WebHdfsPersistWriter writer = new WebHdfsPersistWriter(hdfsWriterConfiguration); + + writer.prepare(null); + + InputStream testActivityFolderStream = TestHdfsPersist.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); + + int count = 0; + + for (String file : files) { + LOGGER.info("File: " + file); + InputStream testActivityFileStream = TestHdfsPersist.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.getAdditionalProperties().remove("$license"); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + writer.write(datum); + LOGGER.info("Wrote: " + activity.getVerb()); + count++; + } - writer.cleanUp(); + writer.cleanUp(); - HdfsReaderConfiguration hdfsReaderConfiguration = MAPPER.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class); + HdfsReaderConfiguration hdfsReaderConfiguration = MAPPER.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class); - WebHdfsPersistReader reader = new WebHdfsPersistReader(hdfsReaderConfiguration); - hdfsReaderConfiguration.setReaderPath(Integer.toString(fields.size())); + WebHdfsPersistReader reader = new WebHdfsPersistReader(hdfsReaderConfiguration); + hdfsReaderConfiguration.setReaderPath(Integer.toString(fields.size())); - reader.prepare(null); + reader.prepare(null); - StreamsResultSet resultSet = reader.readAll(); + StreamsResultSet resultSet = reader.readAll(); - Assert.assertEquals(resultSet.size(), count); + Assert.assertEquals(resultSet.size(), count); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/pom.xml b/streams-contrib/streams-persist-mongo/pom.xml index 3183455..877821d 100644 --- a/streams-contrib/streams-persist-mongo/pom.xml +++ b/streams-contrib/streams-persist-mongo/pom.xml @@ -73,11 +73,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-testing</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-persist-neo4j/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/pom.xml b/streams-contrib/streams-persist-neo4j/pom.xml index d117558..ab8d71a 100644 --- a/streams-contrib/streams-persist-neo4j/pom.xml +++ b/streams-contrib/streams-persist-neo4j/pom.xml @@ -92,11 +92,6 @@ <type>test-jar</type> <scope>test</scope> </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.streams</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java deleted file mode 100644 index 168aed7..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail.provider; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.util.ComponentUtils; - -import com.googlecode.gmail4j.GmailMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * GMailImapProviderTask collects Gmail via IMAP driver. - */ -public class GMailImapProviderTask implements Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailImapProviderTask.class); - - private GMailProvider provider; - - public GMailImapProviderTask(GMailProvider provider) { - this.provider = provider; - } - - @Override - public void run() { - - final List<GmailMessage> messages = this.provider.imapClient.getUnreadMessages(); - - for (GmailMessage message : messages) { - - Activity activity; - GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider ); - activity = serializer.deserialize(message); - StreamsDatum entry = new StreamsDatum(activity); - ComponentUtils.offerUntilSuccess(entry, this.provider.providerQueue); - - } - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java deleted file mode 100644 index bdc22db..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail.provider; - -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Generator; -import org.apache.streams.pojo.json.Icon; -import org.apache.streams.pojo.json.Provider; - -import com.fasterxml.jackson.annotation.JsonBackReference; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonManagedReference; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.googlecode.gmail4j.GmailException; -import com.googlecode.gmail4j.GmailMessage; -import com.googlecode.gmail4j.javamail.JavaMailGmailMessage; -import com.sun.mail.imap.IMAPFolder; -import com.sun.mail.imap.IMAPMessage; -import com.sun.mail.imap.IMAPSSLStore; -import org.apache.commons.lang.NotImplementedException; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.mail.internet.MimeMultipart; - -/** - * GMailMessageActivitySerializer converts a GMail message to Activity. - */ -public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> { - - private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class); - - private GMailProvider provider; - - public GMailMessageActivitySerializer(GMailProvider provider) { - - this.provider = provider; - - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE); - - mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class); - mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class); - mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class); - mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class); - mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class); - - } - - public GMailMessageActivitySerializer() { - } - - @Override - public String serializationFormat() { - return "gmail.v1"; - } - - @Override - public GmailMessage serialize(Activity activity) { - return null; - } - - @Override - public Activity deserialize(GmailMessage gmailMessage) { - - Activity activity = new Activity(); - activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber()))); - activity.setPublished(new DateTime(gmailMessage.getSendDate())); - Provider provider = new Provider(); - provider.setId("http://gmail.com"); - provider.setDisplayName("GMail"); - activity.setProvider(provider); - ActivityObject actor = new ActivityObject(); - actor.setId(gmailMessage.getFrom().getEmail()); - actor.setDisplayName(gmailMessage.getFrom().getName()); - activity.setActor(actor); - activity.setVerb("email"); - ActivityObject object = new ActivityObject(); - try { - object.setId(gmailMessage.getTo().get(0).getEmail()); - object.setDisplayName(gmailMessage.getTo().get(0).getName()); - } catch( GmailException e ) { - LOGGER.warn(e.getMessage()); - } - activity.setTitle(gmailMessage.getSubject()); - try { - activity.setContent(gmailMessage.getContentText()); - } catch( GmailException e ) { - LOGGER.warn(e.getMessage()); - } - activity.setObject(object); - return activity; - } - - @Override - public List<Activity> deserializeAll(List<GmailMessage> serializedList) { - throw new NotImplementedException("Not currently implemented"); - } - - public Activity convert(ObjectNode event) { - return null; - } - - public static Generator buildGenerator(ObjectNode event) { - return null; - } - - public static Icon getIcon(ObjectNode event) { - return null; - } - - public static Provider buildProvider(ObjectNode event) { - Provider provider = new Provider(); - provider.setId("id:providers:gmail"); - return provider; - } - - public static List<Object> getLinks(ObjectNode event) { - return null; - } - - public static String getUrls(ObjectNode event) { - return null; - } - - public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - extensions.put("gmail", gmailMessage); - } - - public static String formatId(String... idparts) { - return String.join(":", - Stream.concat(Arrays.stream(new String[]{"id:googleplus"}), Arrays.stream(idparts)).collect(Collectors.toList())); - } - - interface MessageMixIn { - @JsonManagedReference - @JsonIgnore - IMAPSSLStore getDefaultFolder(); // we don't need it! - @JsonManagedReference - @JsonIgnore - IMAPSSLStore getPersonalNamespaces(); // we don't need it! - @JsonManagedReference - @JsonIgnore - IMAPFolder getStore(); // we don't need it! - // @JsonManagedReference -// @JsonIgnore -// @JsonBackReference - //IMAPFolder getParent(); // we don't need it! - @JsonManagedReference - @JsonIgnore - @JsonBackReference - IMAPMessage getFolder(); // we don't need it! - @JsonManagedReference - @JsonIgnore - @JsonProperty("parent") - @JsonBackReference - MimeMultipart getParent(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java deleted file mode 100644 index bd21acc..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; - -import com.google.gmail.GMailConfiguration; -import com.googlecode.gmail4j.GmailClient; -import com.googlecode.gmail4j.GmailConnection; -import com.googlecode.gmail4j.http.HttpGmailConnection; -import com.googlecode.gmail4j.javamail.ImapGmailClient; -import com.googlecode.gmail4j.javamail.ImapGmailConnection; -import com.googlecode.gmail4j.rss.RssGmailClient; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * GMailProvider collects messages from GMail. - */ -public class GMailProvider implements StreamsProvider, Serializable { - - public final static String STREAMS_ID = "GMailProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class); - - private GMailConfiguration config; - - private Class klass; - - public GMailConfiguration getConfig() { - return config; - } - - public void setConfig(GMailConfiguration config) { - this.config = config; - } - - protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000); - - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); - protected Future task; - - public BlockingQueue<Object> getInQueue() { - return inQueue; - } - - protected GmailClient rssClient; - protected ImapGmailClient imapClient; - - private ExecutorService executor; - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public GMailProvider() { - this.config = new ComponentConfigurator<>(GMailConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); - } - - public GMailProvider(GMailConfiguration config) { - this.config = config; - } - - public GMailProvider(Class klass) { - this.config = new ComponentConfigurator<>(GMailConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); - this.klass = klass; - } - - public GMailProvider(GMailConfiguration config, Class klass) { - this.config = config; - this.klass = klass; - } - - protected DatumStatusCounter countersTotal = new DatumStatusCounter(); - protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); - - @Override - public String getId() { - return "GMailProvider"; - } - - @Override - public void startStream() { - - task = executor.submit(new GMailImapProviderTask(this)); - - } - - @Override - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - synchronized( GMailProvider.class ) { - current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); - } - - return current; - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return !task.isDone() && !task.isCancelled(); - } - - @Override - public void prepare(Object configurationObject) { - - Objects.requireNonNull(this.klass); - - Objects.requireNonNull(config.getUserName()); - Objects.requireNonNull(config.getPassword()); - - rssClient = new RssGmailClient(); - GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray()); - rssClient.setConnection(rssConnection); - - imapClient = new ImapGmailClient(); - GmailConnection imapConnection = new ImapGmailConnection(); - imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray()); - imapClient.setConnection(imapConnection); - - executor = Executors.newSingleThreadExecutor(); - - startStream(); - } - - @Override - public void cleanUp() { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailImapProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailImapProviderTask.java new file mode 100644 index 0000000..bfebe33 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailImapProviderTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gmail.provider; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.util.ComponentUtils; + +import com.googlecode.gmail4j.GmailMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * GMailImapProviderTask collects Gmail via IMAP driver. + */ +public class GMailImapProviderTask implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailImapProviderTask.class); + + private GMailProvider provider; + + public GMailImapProviderTask(GMailProvider provider) { + this.provider = provider; + } + + @Override + public void run() { + + final List<GmailMessage> messages = this.provider.imapClient.getUnreadMessages(); + + for (GmailMessage message : messages) { + + Activity activity; + GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider ); + activity = serializer.deserialize(message); + StreamsDatum entry = new StreamsDatum(activity); + ComponentUtils.offerUntilSuccess(entry, this.provider.providerQueue); + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailMessageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailMessageActivitySerializer.java new file mode 100644 index 0000000..68b8e35 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailMessageActivitySerializer.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gmail.provider; + +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.extensions.ExtensionUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Generator; +import org.apache.streams.pojo.json.Icon; +import org.apache.streams.pojo.json.Provider; + +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonManagedReference; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.googlecode.gmail4j.GmailException; +import com.googlecode.gmail4j.GmailMessage; +import com.googlecode.gmail4j.javamail.JavaMailGmailMessage; +import com.sun.mail.imap.IMAPFolder; +import com.sun.mail.imap.IMAPMessage; +import com.sun.mail.imap.IMAPSSLStore; +import org.apache.commons.lang.NotImplementedException; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.mail.internet.MimeMultipart; + +/** + * GMailMessageActivitySerializer converts a GMail message to Activity. + */ +public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class); + + private GMailProvider provider; + + public GMailMessageActivitySerializer(GMailProvider provider) { + + this.provider = provider; + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE); + + mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class); + mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class); + mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class); + + } + + public GMailMessageActivitySerializer() { + } + + public static Generator buildGenerator(ObjectNode event) { + return null; + } + + public static Icon getIcon(ObjectNode event) { + return null; + } + + public static Provider buildProvider(ObjectNode event) { + Provider provider = new Provider(); + provider.setId("id:providers:gmail"); + return provider; + } + + public static List<Object> getLinks(ObjectNode event) { + return null; + } + + public static String getUrls(ObjectNode event) { + return null; + } + + public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + extensions.put("gmail", gmailMessage); + } + + public static String formatId(String... idparts) { + return String.join(":", + Stream.concat(Arrays.stream(new String[] {"id:googleplus"}), Arrays.stream(idparts)).collect(Collectors.toList())); + } + + @Override + public String serializationFormat() { + return "gmail.v1"; + } + + @Override + public GmailMessage serialize(Activity activity) { + return null; + } + + @Override + public Activity deserialize(GmailMessage gmailMessage) { + + Activity activity = new Activity(); + activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber()))); + activity.setPublished(new DateTime(gmailMessage.getSendDate())); + Provider provider = new Provider(); + provider.setId("http://gmail.com"); + provider.setDisplayName("GMail"); + activity.setProvider(provider); + ActivityObject actor = new ActivityObject(); + actor.setId(gmailMessage.getFrom().getEmail()); + actor.setDisplayName(gmailMessage.getFrom().getName()); + activity.setActor(actor); + activity.setVerb("email"); + ActivityObject object = new ActivityObject(); + try { + object.setId(gmailMessage.getTo().get(0).getEmail()); + object.setDisplayName(gmailMessage.getTo().get(0).getName()); + } catch (GmailException e) { + LOGGER.warn(e.getMessage()); + } + activity.setTitle(gmailMessage.getSubject()); + try { + activity.setContent(gmailMessage.getContentText()); + } catch (GmailException e) { + LOGGER.warn(e.getMessage()); + } + activity.setObject(object); + return activity; + } + + @Override + public List<Activity> deserializeAll(List<GmailMessage> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } + + public Activity convert(ObjectNode event) { + return null; + } + + interface MessageMixIn { + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getDefaultFolder(); // we don't need it! + + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getPersonalNamespaces(); // we don't need it! + + @JsonManagedReference + @JsonIgnore + IMAPFolder getStore(); // we don't need it! + +// @JsonManagedReference +// @JsonIgnore +// @JsonBackReference + //IMAPFolder getParent(); // we don't need it! + @JsonManagedReference + @JsonIgnore + @JsonBackReference + IMAPMessage getFolder(); // we don't need it! + + @JsonManagedReference + @JsonIgnore + @JsonProperty("parent") + @JsonBackReference + MimeMultipart getParent(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailProvider.java new file mode 100644 index 0000000..06c73fc --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/org/apache/streams/gmail/provider/GMailProvider.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gmail.provider; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.gmail.GMailConfiguration; + +import com.googlecode.gmail4j.GmailClient; +import com.googlecode.gmail4j.GmailConnection; +import com.googlecode.gmail4j.http.HttpGmailConnection; +import com.googlecode.gmail4j.javamail.ImapGmailClient; +import com.googlecode.gmail4j.javamail.ImapGmailConnection; +import com.googlecode.gmail4j.rss.RssGmailClient; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * GMailProvider collects messages from GMail. + */ +public class GMailProvider implements StreamsProvider, Serializable { + + public final static String STREAMS_ID = "GMailProvider"; + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class); + protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000); + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); + protected Future task; + protected GmailClient rssClient; + protected ImapGmailClient imapClient; + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private GMailConfiguration config; + private Class klass; + private ExecutorService executor; + + public GMailProvider() { + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); + } + + public GMailProvider(GMailConfiguration config) { + this.config = config; + } + + public GMailProvider(Class klass) { + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); + this.klass = klass; + } + + public GMailProvider(GMailConfiguration config, Class klass) { + this.config = config; + this.klass = klass; + } + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public GMailConfiguration getConfig() { + return config; + } + + public void setConfig(GMailConfiguration config) { + this.config = config; + } + + public BlockingQueue<Object> getInQueue() { + return inQueue; + } + + @Override + public String getId() { + return "GMailProvider"; + } + + @Override + public void startStream() { + + task = executor.submit(new GMailImapProviderTask(this)); + + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized ( GMailProvider.class ) { + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); + } + + return current; + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !task.isDone() && !task.isCancelled(); + } + + @Override + public void prepare(Object configurationObject) { + + Objects.requireNonNull(this.klass); + Objects.requireNonNull(config.getUserName()); + Objects.requireNonNull(config.getPassword()); + + rssClient = new RssGmailClient(); + GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray()); + rssClient.setConnection(rssConnection); + + imapClient = new ImapGmailClient(); + GmailConnection imapConnection = new ImapGmailConnection(); + imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray()); + imapClient.setConnection(imapConnection); + + executor = Executors.newSingleThreadExecutor(); + + startStream(); + } + + @Override + public void cleanUp() { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json deleted file mode 100644 index 2a6a468..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "javaType" : "com.google.gmail.GMailConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "userName": { - "type": "string" - }, - "password": { - "type": "string" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/org/apache/streams/google/gmail/GMailConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/org/apache/streams/google/gmail/GMailConfiguration.json b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/org/apache/streams/google/gmail/GMailConfiguration.json new file mode 100644 index 0000000..614b290 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/org/apache/streams/google/gmail/GMailConfiguration.json @@ -0,0 +1,17 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "javaType" : "org.apache.streams.gmail.GMailConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "userName": { + "type": "string" + }, + "password": { + "type": "string" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java deleted file mode 100644 index 3a611f7..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail.test; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Tests conversion of gmail inputs to Activity - */ -@Ignore("ignore until test resources are available.") -public class GMailMessageSerDeTest { - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class); - - private ObjectMapper mapper = new ObjectMapper(); - - @Ignore - @Test - public void Tests() - { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - - InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); - - // implement - } - } catch( Exception e ) { - e.printStackTrace(); - Assert.fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gmail/src/test/java/org/apache/streams/gmail/test/GMailMessageSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/org/apache/streams/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/org/apache/streams/gmail/test/GMailMessageSerDeTest.java new file mode 100644 index 0000000..8d3da9e --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/org/apache/streams/gmail/test/GMailMessageSerDeTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gmail.test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +/** + * Tests conversion of gmail inputs to Activity + */ +@Ignore("ignore until test resources are available.") +public class GMailMessageSerDeTest { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class); + + private ObjectMapper mapper = new ObjectMapper(); + + @Ignore + @Test + public void Tests() + { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + + InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); + + // implement + } + } catch( Exception e ) { + e.printStackTrace(); + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java deleted file mode 100644 index 0b84932..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.processor; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.pojo.json.Activity; - -import com.google.api.services.plus.model.Comment; -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * GooglePlusCommentProcessor collects comments about a google plus activity. - */ -public class GooglePlusCommentProcessor implements StreamsProcessor { - - private static final String STREAMS_ID = "GooglePlusCommentProcessor"; - private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class); - private GooglePlusActivityUtil googlePlusActivityUtil; - private int count; - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - StreamsDatum result = null; - - try { - Object item = entry.getDocument(); - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - - //Get G+ activity ID from our own activity ID - if (item instanceof Activity) { - Activity activity = (Activity) item; - String activityId = getGPlusID(activity.getId()); - - //Call Google Plus API to get list of comments for this activity ID - /* TODO: FILL ME OUT WITH THE API CALL **/ - List<Comment> comments = new ArrayList<>(); - - GooglePlusActivityUtil.updateActivity(comments, activity); - result = new StreamsDatum(activity); - } - } catch (Exception ex) { - ex.printStackTrace(); - LOGGER.error("Exception while converting Comment to Activity: {}", ex.getMessage()); - } - - if ( result != null ) { - return Stream.of(result).collect(Collectors.toList()); - } else { - return new ArrayList<>(); - } - } - - @Override - public void prepare(Object configurationObject) { - googlePlusActivityUtil = new GooglePlusActivityUtil(); - count = 0; - } - - @Override - public void cleanUp() { - - } - - private String getGPlusID(String activityId) { - String[] activityParts = activityId.split(":"); - return (activityParts.length > 0) ? activityParts[activityParts.length - 1] : ""; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java deleted file mode 100644 index 51a6d51..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.processor; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; - -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.services.plus.model.Person; -import com.google.gplus.serializer.util.GPlusActivityDeserializer; -import com.google.gplus.serializer.util.GPlusEventClassifier; -import com.google.gplus.serializer.util.GPlusPersonDeserializer; -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * GooglePlusTypeConverter is a StreamsProcessor that converts gplus activities to activitystreams activities. - */ -public class GooglePlusTypeConverter implements StreamsProcessor { - - public static final String STREAMS_ID = "GooglePlusTypeConverter"; - - private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class); - private StreamsJacksonMapper mapper; - private Queue<Person> inQueue; - private Queue<StreamsDatum> outQueue; - private int count = 0; - - public GooglePlusTypeConverter() {} - - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; - } - - public void setProcessorInputQueue(Queue<Person> inputQueue) { - inQueue = inputQueue; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - StreamsDatum result = null; - - try { - Object item = entry.getDocument(); - - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - Activity activity = null; - - if (item instanceof String) { - item = deserializeItem(item); - } - - if (item instanceof Person) { - activity = new Activity(); - GooglePlusActivityUtil.updateActivity((Person)item, activity); - } else if (item instanceof com.google.api.services.plus.model.Activity) { - activity = new Activity(); - GooglePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item, activity); - } - - if (activity != null) { - result = new StreamsDatum(activity); - count++; - } - } catch (Exception ex) { - ex.printStackTrace(); - LOGGER.error("Exception while converting Person to Activity: {}", ex.getMessage()); - } - - if ( result != null ) { - return Stream.of(result).collect(Collectors.toList()); - } else { - return new ArrayList<>(); - } - } - - private Object deserializeItem(Object item) { - try { - Class klass = GPlusEventClassifier.detectClass((String) item); - - if (klass.equals(Person.class)) { - item = mapper.readValue((String) item, Person.class); - } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) { - item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class); - } - } catch (Exception ex) { - LOGGER.error("Exception while trying to deserializeItem: {}", ex); - } - - return item; - } - - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); - mapper.registerModule(simpleModule); - - simpleModule = new SimpleModule(); - simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer()); - mapper.registerModule(simpleModule); - } - - @Override - public void cleanUp() { - //No-op - } -}
