http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserDataCollector.java new file mode 100644 index 0000000..f81ecc0 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserDataCollector.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.providers; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.gplus.provider.GPlusUserDataCollector; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.plus.Plus; +import com.google.api.services.plus.model.Person; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Basic Units for {@link org.apache.streams.gplus.provider.GPlusUserDataCollector}. + */ +public class TestGPlusUserDataCollector { + + private static final String NO_ERROR = "no error"; + + /** + * Test that on success a datum will be added to the queue. + * + * @throws Exception Exception + */ + @Test + public void testSucessfullPull() throws Exception { + Plus plus = createMockPlus(0, null); + BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); + UserInfo user = new UserInfo(); + user.setUserId("A"); + + GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOff, datums, user); + collector.run(); + + Assert.assertEquals(1, datums.size()); + StreamsDatum datum = datums.take(); + Assert.assertNotNull(datum); + Assert.assertEquals(NO_ERROR, datum.getId()); + Assert.assertNotNull(datum.getDocument()); + Assert.assertTrue(datum.getDocument() instanceof String); + } + + /** + * Test that on failure, no datums are output. + * + * @throws Exception Exception + */ + @Test + public void testFail() throws Exception { + Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class)); + UserInfo user = new UserInfo(); + user.setUserId("A"); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); + BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1); + + GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOffStrategy, datums, user); + collector.run(); + + Assert.assertEquals(0, datums.size()); + } + + private Plus createMockPlus(final int succedOnTry, final Throwable throwable) { + Plus plus = mock(Plus.class); + doAnswer(invocationOnMock -> createMockPeople(succedOnTry, throwable)).when(plus).people(); + return plus; + } + + private Plus.People createMockPeople(final int succedOnTry, final Throwable throwable) { + Plus.People people = mock(Plus.People.class); + try { + when(people.get(anyString())).thenAnswer(invocationOnMock -> createMockGetNoError(succedOnTry, throwable)); + } catch (IOException ioe) { + Assert.fail("No Excpetion should have been thrown while creating mocks"); + } + return people; + } + + private Plus.People.Get createMockGetNoError(final int succedOnTry, final Throwable throwable) { + Plus.People.Get get = mock(Plus.People.Get.class); + try { + doAnswer(new Answer() { + private int counter = 0; + + @Override + public Person answer(InvocationOnMock invocationOnMock) throws Throwable { + if (counter == succedOnTry) { + Person person = new Person(); + person.setId(NO_ERROR); + return person; + } else { + ++counter; + throw throwable; + } + } + }).when(get).execute(); + } catch (IOException ioe) { + Assert.fail("No Excpetion should have been thrown while creating mocks"); + } + return get; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifierTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifierTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifierTest.java new file mode 100644 index 0000000..1a9720d --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifierTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.serializer.util; + +import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.api.services.plus.model.Activity; +import com.google.api.services.plus.model.Person; +import org.junit.Assert; +import org.junit.Test; + +/** + * GPlusEventClassifierTest tests GPlusEventClassifier. + */ +public class GPlusEventClassifierTest { + + private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); + + @Test + public void classifyActivityTest() { + try { + Activity activity = new Activity(); + activity.setKind("plus#activity"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity)); + Assert.assertEquals(retClass, Activity.class); + } catch (Exception ex) { + // + } + } + + @Test + public void classifyPersonTest() { + try { + Person person = new Person(); + person.setKind("plus#person"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); + Assert.assertEquals(retClass, Person.class); + } catch (Exception ex) { + // + } + } + + @Test + public void classifyObjectNodeTest() { + try { + Person person = new Person(); + person.setKind("fake"); + Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person)); + Assert.assertEquals(retClass, ObjectNode.class); + } catch (Exception ex) { + // + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/processors/GooglePlusTypeConverterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/processors/GooglePlusTypeConverterIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/processors/GooglePlusTypeConverterIT.java deleted file mode 100644 index 098f555..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/processors/GooglePlusTypeConverterIT.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.gplus.test.processors; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.services.plus.model.Person; -import com.google.gplus.processor.GooglePlusTypeConverter; -import com.google.gplus.serializer.util.GPlusActivityDeserializer; -import com.google.gplus.serializer.util.GPlusPersonDeserializer; -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.List; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -/** - * Tests conversion of gplus inputs to Activity - */ -public class GooglePlusTypeConverterIT { - - private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverterIT.class); - private GooglePlusTypeConverter googlePlusTypeConverter; - private ObjectMapper objectMapper; - - @BeforeClass - public void setup() { - objectMapper = StreamsJacksonMapper.getInstance(); - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); - simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer()); - objectMapper.registerModule(simpleModule); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - googlePlusTypeConverter = new GooglePlusTypeConverter(); - googlePlusTypeConverter.prepare(null); - } - - @Test(dependsOnGroups={"testGPlusUserDataProvider"}) - public void testProcessPerson() throws IOException, ActivitySerializerException { - - File file = new File("target/test-classes/GPlusUserDataProviderIT.stdout.txt"); - InputStream is = new FileInputStream(file); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - while (br.ready()) { - String line = br.readLine(); - if (!StringUtils.isEmpty(line)) { - LOGGER.info("raw: {}", line); - Activity activity = new Activity(); - - Person person = objectMapper.readValue(line, Person.class); - StreamsDatum streamsDatum = new StreamsDatum(person); - - assertNotNull(streamsDatum.getDocument()); - - List<StreamsDatum> retList = googlePlusTypeConverter.process(streamsDatum); - GooglePlusActivityUtil.updateActivity(person, activity); - - assertEquals(retList.size(), 1); - assert(retList.get(0).getDocument() instanceof Activity); - assertEquals(activity, retList.get(0).getDocument()); - } - } - } - - @Test(dependsOnGroups={"testGPlusUserActivityProvider"}) - public void testProcessActivity() throws IOException, ActivitySerializerException{ - - File file = new File("target/test-classes/GPlusUserActivityProviderIT.stdout.txt"); - InputStream is = new FileInputStream(file); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - while (br.ready()) { - String line = br.readLine(); - if (!StringUtils.isEmpty(line)) { - LOGGER.info("raw: {}", line); - Activity activity = new Activity(); - - com.google.api.services.plus.model.Activity gPlusActivity = objectMapper.readValue(line, com.google.api.services.plus.model.Activity.class); - StreamsDatum streamsDatum = new StreamsDatum(gPlusActivity); - - assertNotNull(streamsDatum.getDocument()); - - List<StreamsDatum> retList = googlePlusTypeConverter.process(streamsDatum); - GooglePlusActivityUtil.updateActivity(gPlusActivity, activity); - - assertEquals(retList.size(), 1); - assertTrue(retList.get(0).getDocument() instanceof Activity); - assertEquals(activity, retList.get(0).getDocument()); - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java deleted file mode 100644 index 71e825f..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.gplus.test.providers; - -import com.google.gplus.provider.GPlusUserActivityProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.Test; - -import java.io.File; -import java.io.FileReader; -import java.io.LineNumberReader; - -public class GPlusUserActivityProviderIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityProviderIT.class); - - @Test(groups={"testGPlusUserActivityProvider"}) - public void testGPlusUserActivityProvider() throws Exception { - - String configfile = "./target/test-classes/GPlusUserActivityProviderIT.conf"; - String outfile = "./target/test-classes/GPlusUserActivityProviderIT.stdout.txt"; - - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; - - Thread testThread = new Thread(() -> { - try { - GPlusUserActivityProvider.main(args); - } catch ( Exception ex ) { - LOGGER.error("Test Exception!", ex); - } - }); - testThread.start(); - testThread.join(60000); - - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); - - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); - - while (outCounter.readLine() != null) {} - - assert (outCounter.getLineNumber() >= 1); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java deleted file mode 100644 index 031716e..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.gplus.test.providers; - -import com.google.gplus.provider.GPlusUserDataProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.Test; - -import java.io.File; -import java.io.FileReader; -import java.io.LineNumberReader; - -public class GPlusUserDataProviderIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataProviderIT.class); - - @Test(groups={"testGPlusUserDataProvider"}) - public void testGPlusUserDataProvider() throws Exception { - - String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf"; - String outfile = "./target/test-classes/GPlusUserDataProviderIT.stdout.txt"; - - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; - - Thread testThread = new Thread(() -> { - try { - GPlusUserDataProvider.main(args); - } catch ( Exception ex ) { - LOGGER.error("Test Exception!", ex); - } - }); - testThread.start(); - testThread.join(60000); - - GPlusUserDataProvider.main(new String[]{configfile, outfile}); - - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); - - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); - - while (outCounter.readLine() != null) {} - - assert (outCounter.getLineNumber() >= 1); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json deleted file mode 100644 index 891c1da..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType" : "org.apache.streams.instagram.InstagramConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "clientId": { - "type": "string", - "description": "Your Instagram Client Id" - }, - "usersInfo": { - "type": "object", - "javaInterfaces" : ["java.io.Serializable"], - "properties": { - "authorizedTokens": { - "type": "array", - "uniqueItems": true, - "items": { - "type": "string" - }, - "description": "Instagram tokens for authorized users of your client/app" - }, - "users": { - "type": "array", - "uniqueItems": true, - "items": { - "type": "object", - "$ref": "#/definitions/user" - }, - "description": "List of user ids to gather data for. Type of data gathered depends on provider" - }, - "defaultAfterDate": { - "type": "string", - "format": "date-time", - "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for all users that don't have date ranges specified. If this is null it will pull from the earliest possible time" - }, - "defaultBeforeDate": { - "type": "string", - "format": "date-time", - "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for all users that don't have date ranges specified. If this is null it will pull till current time." - } - } - } - }, - "definitions": { - "user": { - "type": "object", - "javaInterfaces" : ["java.io.Serializable"], - "properties": { - "userId": { - "type": "string", - "description": "instagram user id" - }, - "afterDate": { - "type": "string", - "format": "date-time", - "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate." - }, - "beforeDate": { - "type": "string", - "format": "date-time", - "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate." - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json deleted file mode 100644 index 42eb5d6..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType" : "org.apache.streams.instagram.InstagramUserInformationConfiguration", - "extends": {"$ref":"InstagramConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "userIds": { - "type": "array", - "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream", - "items": { - "type": "string" - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramConfiguration.json new file mode 100644 index 0000000..891c1da --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramConfiguration.json @@ -0,0 +1,71 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.instagram.InstagramConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "clientId": { + "type": "string", + "description": "Your Instagram Client Id" + }, + "usersInfo": { + "type": "object", + "javaInterfaces" : ["java.io.Serializable"], + "properties": { + "authorizedTokens": { + "type": "array", + "uniqueItems": true, + "items": { + "type": "string" + }, + "description": "Instagram tokens for authorized users of your client/app" + }, + "users": { + "type": "array", + "uniqueItems": true, + "items": { + "type": "object", + "$ref": "#/definitions/user" + }, + "description": "List of user ids to gather data for. Type of data gathered depends on provider" + }, + "defaultAfterDate": { + "type": "string", + "format": "date-time", + "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for all users that don't have date ranges specified. If this is null it will pull from the earliest possible time" + }, + "defaultBeforeDate": { + "type": "string", + "format": "date-time", + "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for all users that don't have date ranges specified. If this is null it will pull till current time." + } + } + } + }, + "definitions": { + "user": { + "type": "object", + "javaInterfaces" : ["java.io.Serializable"], + "properties": { + "userId": { + "type": "string", + "description": "instagram user id" + }, + "afterDate": { + "type": "string", + "format": "date-time", + "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate." + }, + "beforeDate": { + "type": "string", + "format": "date-time", + "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate." + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramUserInformationConfiguration.json new file mode 100644 index 0000000..42eb5d6 --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/org/apache/streams/instagram/InstagramUserInformationConfiguration.json @@ -0,0 +1,20 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.instagram.InstagramUserInformationConfiguration", + "extends": {"$ref":"InstagramConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "userIds": { + "type": "array", + "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream", + "items": { + "type": "string" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/Moreover.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/Moreover.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/Moreover.json deleted file mode 100644 index 8708e82..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/Moreover.json +++ /dev/null @@ -1,337 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType": "com.moreover.Moreover", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "tags": { - "type": "string" - }, - "publishingPlatform": { - "properties": { - "totalViews": { - "type": "string" - }, - "itemId": { - "type": "string" - } - } - }, - "loginStatus": { - "type": "string" - }, - "duplicateGroupId": { - "type": "string" - }, - "companies": { - "properties": { - "symbol": { - "type": "string" - }, - "primary": { - "type": "string" - }, - "name": { - "type": "string" - }, - "contentCount": { - "type": "string" - }, - "exchange": { - "type": "string" - }, - "titleCount": { - "type": "string" - }, - "isin": { - "type": "string" - } - } - }, - "copyright": { - "type": "string" - }, - "url": { - "type": "string" - }, - "content": { - "type": "string" - }, - "id": { - "type": "string" - }, - "author": { - "properties": { - "publishingPlatform": { - "properties": { - "userId": { - "type": "string" - }, - "totalViews": { - "type": "string" - } - } - }, - "dateLastActive": { - "type": "string" - }, - "description": { - "type": "string" - }, - "name": { - "type": "string" - } - } - }, - "topics": { - "properties": { - "name": { - "type": "string" - }, - "group": { - "type": "string" - } - } - }, - "title": { - "type": "string" - }, - "source": { - "properties": { - "category": { - "type": "string" - }, - "location": { - "properties": { - "region": { - "type": "string" - }, - "subregion": { - "type": "string" - }, - "zipCode": { - "type": "string" - }, - "state": { - "type": "string" - }, - "countryCode": { - "type": "string" - }, - "zipArea": { - "type": "string" - }, - "country": { - "type": "string" - } - } - }, - "editorialRank": { - "type": "string" - }, - "name": { - "type": "string" - }, - "feed": { - "properties": { - "tags": { - "type": "string" - }, - "genre": { - "type": "string" - }, - "publishingPlatform": { - "type": "string" - }, - "inWhiteList": { - "type": "string" - }, - "imageUrl": { - "type": "string" - }, - "copyright": { - "type": "string" - }, - "mediaType": { - "type": "string" - }, - "id": { - "type": "string" - }, - "rank": { - "properties": { - "autoRankOrder": { - "type": "string" - }, - "inboundLinkCount": { - "type": "string" - }, - "autoRank": { - "type": "string" - } - } - }, - "description": { - "type": "string" - }, - "idFromPublisher": { - "type": "string" - }, - "name": { - "type": "string" - }, - "dataFormat": { - "type": "string" - }, - "generator": { - "type": "string" - }, - "autoTopics": { - "type": "string" - }, - "language": { - "type": "string" - }, - "editorialTopics": { - "type": "string" - } - } - }, - "homeUrl": { - "type": "string" - }, - "publisher": { - "type": "string" - } - } - }, - "locations": { - "properties": { - "region": { - "type": "string" - }, - "subregion": { - "type": "string" - }, - "name": { - "type": "string" - }, - "state": { - "properties": { - "name": { - "type": "string" - }, - "fipsCode": { - "type": "string" - }, - "confidence": { - "type": "string" - } - } - }, - "longitude": { - "type": "string" - }, - "latitude": { - "type": "string" - }, - "confidence": { - "type": "string" - }, - "type": { - "type": "string" - }, - "mentions": { - "type": "string" - }, - "country": { - "properties": { - "name": { - "type": "string" - }, - "fipsCode": { - "type": "string" - }, - "isoCode": { - "type": "string" - }, - "confidence": { - "type": "string" - } - } - } - } - }, - "commentsUrl": { - "type": "string" - }, - "dataFormat": { - "type": "string" - }, - "outboundUrls": { - "type": "string" - }, - "sequenceId": { - "type": "string" - }, - "publishedDate": { - "type": "string" - }, - "language": { - "type": "string" - }, - "adultLanguage": { - "type": "string" - }, - "harvestDate": { - "type": "string" - }, - "media": { - "properties": { - "duration": { - "type": "string" - }, - "audio": { - "properties": { - "url": { - "type": "string" - }, - "mimeType": { - "type": "string" - } - } - }, - "image": { - "properties": { - "url": { - "type": "string" - } - } - }, - "caption": { - "type": "string" - }, - "video": { - "properties": { - "url": { - "type": "string" - }, - "mimeType": { - "type": "string" - } - } - }, - "url": { - "type": "string" - }, - "mimeType": { - "type": "string" - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json deleted file mode 100644 index bbbda5d..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType" : "org.apache.streams.moreover.MoreoverConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "apiKeys": { - "type": "array", - "minItems": 1, - "items": { - "type": "object", - "javaType" : "org.apache.streams.moreover.MoreoverKeyData", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "id": { - "type": "string" - }, - "key": { - "type": "string" - }, - "startingSequence": { - "type": "string" - } - } - } - }, - "maxBatchSize": { - "type": "long" - }, - "minDelaySeconds": { - "type": "long" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/Moreover.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/Moreover.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/Moreover.json new file mode 100644 index 0000000..ee60b3e --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/Moreover.json @@ -0,0 +1,337 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType": "org.apache.streams.moreover.Moreover", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "tags": { + "type": "string" + }, + "publishingPlatform": { + "properties": { + "totalViews": { + "type": "string" + }, + "itemId": { + "type": "string" + } + } + }, + "loginStatus": { + "type": "string" + }, + "duplicateGroupId": { + "type": "string" + }, + "companies": { + "properties": { + "symbol": { + "type": "string" + }, + "primary": { + "type": "string" + }, + "name": { + "type": "string" + }, + "contentCount": { + "type": "string" + }, + "exchange": { + "type": "string" + }, + "titleCount": { + "type": "string" + }, + "isin": { + "type": "string" + } + } + }, + "copyright": { + "type": "string" + }, + "url": { + "type": "string" + }, + "content": { + "type": "string" + }, + "id": { + "type": "string" + }, + "author": { + "properties": { + "publishingPlatform": { + "properties": { + "userId": { + "type": "string" + }, + "totalViews": { + "type": "string" + } + } + }, + "dateLastActive": { + "type": "string" + }, + "description": { + "type": "string" + }, + "name": { + "type": "string" + } + } + }, + "topics": { + "properties": { + "name": { + "type": "string" + }, + "group": { + "type": "string" + } + } + }, + "title": { + "type": "string" + }, + "source": { + "properties": { + "category": { + "type": "string" + }, + "location": { + "properties": { + "region": { + "type": "string" + }, + "subregion": { + "type": "string" + }, + "zipCode": { + "type": "string" + }, + "state": { + "type": "string" + }, + "countryCode": { + "type": "string" + }, + "zipArea": { + "type": "string" + }, + "country": { + "type": "string" + } + } + }, + "editorialRank": { + "type": "string" + }, + "name": { + "type": "string" + }, + "feed": { + "properties": { + "tags": { + "type": "string" + }, + "genre": { + "type": "string" + }, + "publishingPlatform": { + "type": "string" + }, + "inWhiteList": { + "type": "string" + }, + "imageUrl": { + "type": "string" + }, + "copyright": { + "type": "string" + }, + "mediaType": { + "type": "string" + }, + "id": { + "type": "string" + }, + "rank": { + "properties": { + "autoRankOrder": { + "type": "string" + }, + "inboundLinkCount": { + "type": "string" + }, + "autoRank": { + "type": "string" + } + } + }, + "description": { + "type": "string" + }, + "idFromPublisher": { + "type": "string" + }, + "name": { + "type": "string" + }, + "dataFormat": { + "type": "string" + }, + "generator": { + "type": "string" + }, + "autoTopics": { + "type": "string" + }, + "language": { + "type": "string" + }, + "editorialTopics": { + "type": "string" + } + } + }, + "homeUrl": { + "type": "string" + }, + "publisher": { + "type": "string" + } + } + }, + "locations": { + "properties": { + "region": { + "type": "string" + }, + "subregion": { + "type": "string" + }, + "name": { + "type": "string" + }, + "state": { + "properties": { + "name": { + "type": "string" + }, + "fipsCode": { + "type": "string" + }, + "confidence": { + "type": "string" + } + } + }, + "longitude": { + "type": "string" + }, + "latitude": { + "type": "string" + }, + "confidence": { + "type": "string" + }, + "type": { + "type": "string" + }, + "mentions": { + "type": "string" + }, + "country": { + "properties": { + "name": { + "type": "string" + }, + "fipsCode": { + "type": "string" + }, + "isoCode": { + "type": "string" + }, + "confidence": { + "type": "string" + } + } + } + } + }, + "commentsUrl": { + "type": "string" + }, + "dataFormat": { + "type": "string" + }, + "outboundUrls": { + "type": "string" + }, + "sequenceId": { + "type": "string" + }, + "publishedDate": { + "type": "string" + }, + "language": { + "type": "string" + }, + "adultLanguage": { + "type": "string" + }, + "harvestDate": { + "type": "string" + }, + "media": { + "properties": { + "duration": { + "type": "string" + }, + "audio": { + "properties": { + "url": { + "type": "string" + }, + "mimeType": { + "type": "string" + } + } + }, + "image": { + "properties": { + "url": { + "type": "string" + } + } + }, + "caption": { + "type": "string" + }, + "video": { + "properties": { + "url": { + "type": "string" + }, + "mimeType": { + "type": "string" + } + } + }, + "url": { + "type": "string" + }, + "mimeType": { + "type": "string" + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/MoreoverConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/MoreoverConfiguration.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/MoreoverConfiguration.json new file mode 100644 index 0000000..bbbda5d --- /dev/null +++ b/streams-contrib/streams-provider-moreover/src/main/jsonschema/org/apache/streams/moreover/MoreoverConfiguration.json @@ -0,0 +1,38 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.moreover.MoreoverConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "apiKeys": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "javaType" : "org.apache.streams.moreover.MoreoverKeyData", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "id": { + "type": "string" + }, + "key": { + "type": "string" + }, + "startingSequence": { + "type": "string" + } + } + } + }, + "maxBatchSize": { + "type": "long" + }, + "minDelaySeconds": { + "type": "long" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java index 779c2ab..bf4720a 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java @@ -21,6 +21,7 @@ package org.apache.streams.rss.provider.perpetual; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.provider.RssStreamProviderTask; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -29,7 +30,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -55,20 +55,20 @@ public class RssFeedSchedulerTest { RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<>(), 1); scheduler.scheduleFeeds(); - assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size()); - assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0)); - assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1)); + Assert.assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size()); + Assert.assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0)); + Assert.assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1)); safeSleep(1); scheduler.scheduleFeeds(); - assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size()); - assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2)); + Assert.assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size()); + Assert.assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2)); safeSleep(60 * 1000); scheduler.scheduleFeeds(); - assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size()); - assertEquals("1", queuedTasks.get(3)); - assertEquals("2", queuedTasks.get(4)); + Assert.assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size()); + Assert.assertEquals("1", queuedTasks.get(3)); + Assert.assertEquals("2", queuedTasks.get(4)); } private List<FeedDetails> createFeedList() { @@ -79,7 +79,7 @@ public class RssFeedSchedulerTest { list.add(fd); fd = new FeedDetails(); - fd.setPollIntervalMillis( 60L * 1000); + fd.setPollIntervalMillis(60L * 1000); fd.setUrl("2"); list.add(fd); return list; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index fffe7a1..a77a129 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -27,13 +27,13 @@ import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.data.util.RFC3339Utils; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.sysomos.SysomosConfiguration; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.util.concurrent.Uninterruptibles; -import com.sysomos.SysomosConfiguration; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; @@ -61,24 +61,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Streams Provider for the Sysomos Heartbeat API - * * <p/> * Configuration: * The provider takes either a Map[String,Object] containing the mode (backfill and terminate OR continuous) and a * Map[String,String] of heartbeat IDs to document target ids or a string of the format - * ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId} + * ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId} * This configuration will configure the provider to backfill to the specified document and either terminate or not * depending on the mode flag. Continuous mode is assumed, and is the ony mode supported by the String configuration. - * */ public class SysomosProvider implements StreamsProvider { public static final String STREAMS_ID = "SysomosProvider"; - - public enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } - - private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); - public static final String ENDING_TIME_KEY = "addedBefore"; public static final String STARTING_TIME_KEY = "addedAfter"; public static final String MODE_KEY = "mode"; @@ -86,16 +79,14 @@ public class SysomosProvider implements StreamsProvider { public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request - - protected volatile Queue<StreamsDatum> providerQueue; - + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Set<String> completedHeartbeats = new HashSet<>(); private final long maxQueued; private final long minLatency; private final long scheduledLatency; private final long maxApiBatch; - + protected volatile Queue<StreamsDatum> providerQueue; private SysomosClient client; private SysomosConfiguration config; private ScheduledExecutorService stream; @@ -105,7 +96,6 @@ public class SysomosProvider implements StreamsProvider { private Mode mode = Mode.CONTINUOUS; private boolean started = false; private AtomicInteger count; - /** * SysomosProvider constructor. * @param sysomosConfiguration SysomosConfiguration @@ -121,6 +111,57 @@ public class SysomosProvider implements StreamsProvider { this.count = new AtomicInteger(); } + /** + * To use from command line: + * <p/> + * Supply configuration similar to src/test/resources/rss.conf + * <p/> + * Launch using: + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + SysomosConfiguration config = new ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe, "rss"); + SysomosProvider provider = new SysomosProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + for (StreamsDatum datum : provider.readCurrent()) { + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } + } + while (provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + public SysomosConfiguration getConfig() { return config; } @@ -197,7 +238,7 @@ public class SysomosProvider implements StreamsProvider { public boolean isRunning() { return providerQueue.size() > 0 || (completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size() - && !(stream.isTerminated() + && !(stream.isTerminated() || stream.isShutdown())); } @@ -205,7 +246,7 @@ public class SysomosProvider implements StreamsProvider { public void prepare(Object configurationObject) { this.providerQueue = constructQueue(); if (configurationObject instanceof Map) { - extractConfigFromMap((Map) configurationObject); + extractConfigFromMap((Map)configurationObject); } else if (configurationObject instanceof String) { documentIds = Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String)configurationObject); } @@ -338,58 +379,5 @@ public class SysomosProvider implements StreamsProvider { return this.count.get(); } - /** - * To use from command line: - * - * <p/> - * Supply configuration similar to src/test/resources/rss.conf - * - * <p/> - * Launch using: - * - * <p/> - * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" - * - * @param args args - * @throws Exception Exception - */ - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File file = new File(configfile); - assert (file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - SysomosConfiguration config = new ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe, "rss"); - SysomosProvider provider = new SysomosProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } - } - } - while ( provider.isRunning() ); - provider.cleanUp(); - outStream.flush(); - } + public enum Mode {CONTINUOUS, BACKFILL_AND_TERMINATE} } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/Sysomos.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/Sysomos.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/Sysomos.json deleted file mode 100644 index d9080e8..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/Sysomos.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType": "com.sysomos.json.Sysomos", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "tweetJsonLink": { - "type": "string" - }, - "location": { - "properties": { - "locationString": { - "type": "string" - }, - "country": { - "type": "string" - } - } - }, - "link": { - "type": "string" - }, - "twitterFollowing": { - "type": "string" - }, - "twitterFollowers": { - "type": "string" - }, - "tweetid": { - "type": "string" - }, - "mediaType": { - "type": "string" - }, - "content": { - "type": "string" - }, - "docid": { - "type": "string" - }, - "sentiment": { - "type": "string" - }, - "time": { - "type": "string" - }, - "title": { - "type": "string" - }, - "tweetHbLink": { - "type": "string" - }, - "influenceLevel": { - "type": "string" - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json deleted file mode 100644 index 4ce336e..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/com/sysomos/SysomosConfiguration.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "type": "object", - "javaType" : "com.sysomos.SysomosConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "apiKey": { - "type": "string" - }, - "endpoint": { - "type": "string", - "description": "The endpoint", - "default": "http://api.sysomos.com/" - }, - "heartbeatIds": { - "type": "array", - "minItems": 1, - "items": { - "type": "string" - } - }, - "minDelayMs": { - "type": "String", - "format": "utc-millisec" - }, - "scheduledDelayMs": { - "type": "String", - "format": "utc-millisec" - }, - "maxBatchSize": { - "type": "String", - "format": "utc-millisec" - }, - "apiBatchSize": { - "type": "String", - "format": "utc-millisec" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/Sysomos.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/Sysomos.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/Sysomos.json new file mode 100644 index 0000000..b5feb61 --- /dev/null +++ b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/Sysomos.json @@ -0,0 +1,61 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType": "org.apache.streams.sysomos.Sysomos", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "tweetJsonLink": { + "type": "string" + }, + "location": { + "properties": { + "locationString": { + "type": "string" + }, + "country": { + "type": "string" + } + } + }, + "link": { + "type": "string" + }, + "twitterFollowing": { + "type": "string" + }, + "twitterFollowers": { + "type": "string" + }, + "tweetid": { + "type": "string" + }, + "mediaType": { + "type": "string" + }, + "content": { + "type": "string" + }, + "docid": { + "type": "string" + }, + "sentiment": { + "type": "string" + }, + "time": { + "type": "string" + }, + "title": { + "type": "string" + }, + "tweetHbLink": { + "type": "string" + }, + "influenceLevel": { + "type": "string" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/SysomosConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/SysomosConfiguration.json b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/SysomosConfiguration.json new file mode 100644 index 0000000..487455f --- /dev/null +++ b/streams-contrib/streams-provider-sysomos/src/main/jsonschema/org/apache/streams/sysomos/SysomosConfiguration.json @@ -0,0 +1,43 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.sysomos.SysomosConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "apiKey": { + "type": "string" + }, + "endpoint": { + "type": "string", + "description": "The endpoint", + "default": "http://api.sysomos.com/" + }, + "heartbeatIds": { + "type": "array", + "minItems": 1, + "items": { + "type": "string" + } + }, + "minDelayMs": { + "type": "String", + "format": "utc-millisec" + }, + "scheduledDelayMs": { + "type": "String", + "format": "utc-millisec" + }, + "maxBatchSize": { + "type": "String", + "format": "utc-millisec" + }, + "apiBatchSize": { + "type": "String", + "format": "utc-millisec" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java deleted file mode 100644 index ab1c2a4..0000000 --- a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.sysomos.test; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.sysomos.json.Sysomos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Tests ability to convert String json form to {@link com.sysomos.json.Sysomos} form - */ -public class SysomosJsonSerDeIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(SysomosJsonSerDeIT.class); - - private ObjectMapper mapper = new ObjectMapper(); - - @Test - public void testSysomosJsonSerDe() { - - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - - InputStream is = SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); - - Sysomos ser = mapper.readValue(line, Sysomos.class); - - String des = mapper.writeValueAsString(ser); - LOGGER.debug(des); - } - } catch ( Exception ex ) { - ex.printStackTrace(); - Assert.fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java deleted file mode 100644 index e078d02..0000000 --- a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.sysomos.test; - -import com.fasterxml.aalto.stax.InputFactoryImpl; -import com.fasterxml.aalto.stax.OutputFactoryImpl; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule; -import com.fasterxml.jackson.dataformat.xml.XmlFactory; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.sysomos.xml.BeatApi; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Tests ability to convert String xml form to {@link com.sysomos.xml.BeatApi} form - */ -public class SysomosXmlSerDeIT { - - private static final Logger LOGGER = LoggerFactory.getLogger(SysomosXmlSerDeIT.class); - - private XmlMapper xmlMapper; - - /** - * before. - */ - @Before - public void before() { - - XmlFactory xmlFactory = new XmlFactory(new InputFactoryImpl(), - new OutputFactoryImpl()); - - JacksonXmlModule module = new JacksonXmlModule(); - - module.setDefaultUseWrapper(false); - - xmlMapper = new XmlMapper(xmlFactory, module); - - xmlMapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); - - } - - @Test - public void test() { - - InputStream is = SysomosXmlSerDeIT.class.getResourceAsStream("/sysomos_xmls.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); - - BeatApi ser = xmlMapper.readValue(line, BeatApi.class); - - String des = xmlMapper.writeValueAsString(ser); - LOGGER.debug(des); - } - } catch ( Exception ex ) { - ex.printStackTrace(); - Assert.fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/SysomosJsonSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/SysomosJsonSerDeIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/SysomosJsonSerDeIT.java new file mode 100644 index 0000000..56555ad --- /dev/null +++ b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/SysomosJsonSerDeIT.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.sysomos.test; + +import org.apache.streams.sysomos.Sysomos; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +/** + * Tests ability to convert String json form to {@link org.apache.streams.sysomos.Sysomos} form + */ +public class SysomosJsonSerDeIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosJsonSerDeIT.class); + + private ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testSysomosJsonSerDe() { + + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + + InputStream is = SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); + + Sysomos ser = mapper.readValue(line, Sysomos.class); + + String des = mapper.writeValueAsString(ser); + LOGGER.debug(des); + } + } catch (Exception ex) { + ex.printStackTrace(); + Assert.fail(); + } + } +}
