Repository: incubator-streams Updated Branches: refs/heads/STREAMS-26 34990168b -> 3a192885a
Cleaning up contrib modules Preparing to merge STREAMS-26 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9165129 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9165129 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9165129 Branch: refs/heads/STREAMS-26 Commit: f916512905fc911ee7bc41fe0cef7478dc0cdefb Parents: 3499016 Author: sblackmon <[email protected]> Authored: Wed Mar 19 13:36:33 2014 -0500 Committer: sblackmon <[email protected]> Committed: Wed Mar 19 13:36:33 2014 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 11 ++++---- .../streams/hdfs/WebHdfsPersistReader.java | 27 +++++++++++++------- .../streams/hdfs/WebHdfsPersistReaderTask.java | 4 +-- .../streams/rss/provider/RssStreamProvider.java | 2 +- .../streams/rss/test/Top100FeedsTest.java | 4 ++- .../streams-provider-rss.iml | 16 +++--------- .../apache/streams/sysomos/SysomosProvider.java | 20 +++++++-------- 7 files changed, 43 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 0cd5db7..37ee3b7 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -41,14 +41,13 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> - <!--<module>streams-provider-datasift</module>--> - <!--<module>streams-provider-facebook</module>--> - <!--<module>streams-provider-gnip</module>--> + <module>streams-provider-datasift</module> + <module>streams-provider-facebook</module> + <module>streams-provider-gnip</module> <module>streams-provider-moreover</module> <module>streams-provider-twitter</module> - <!--<module>streams-provider-sysomos</module>--> - <!--<module>streams-provider-rss</module>--> - <!--<module>streams-proxy-semantria</module>--> + <module>streams-provider-sysomos</module> + <module>streams-provider-rss</module> <module>streams-components-test</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index 659c517..511f684 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -170,26 +170,35 @@ public class WebHdfsPersistReader implements StreamsPersistReader { private void readSourceWritePersistQueue() { for( FileStatus fileStatus : status ) { BufferedReader reader; - - if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) { + LOGGER.info("Found " + fileStatus.getPath().getName()); + if( persistQueue.size() > 0 ) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { + LOGGER.info("Processing " + fileStatus.getPath().getName()); try { reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath()))); - String line; + String line = ""; do{ try { line = reader.readLine(); - if( line != null ) { + if( !Strings.isNullOrEmpty(line) ) { String[] fields = line.split(Character.toString(DELIMITER)); - persistQueue.offer(new StreamsDatum(fields[3])); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(fields[2])); + persistQueue.offer(entry); } - } catch (IOException e) { - break; + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); } } while( line != null ); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); - break; + LOGGER.warn(e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index 6cd1e79..f0bee1f 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -29,7 +29,7 @@ public class WebHdfsPersistReaderTask implements Runnable { for( FileStatus fileStatus : reader.status ) { BufferedReader bufferedReader; - if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) { + if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { try { bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath()))); @@ -45,7 +45,7 @@ public class WebHdfsPersistReaderTask implements Runnable { reader.persistQueue.offer(entry); } } catch (Exception e) { - LOGGER.warn("Failed processing " + line); + LOGGER.warn("Failed reading " + line); } } while( line != null ); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java index 449f187..c4eee04 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java @@ -31,7 +31,7 @@ import java.util.concurrent.*; /** * Created by sblackmon on 12/10/13. */ -public class RssStreamProvider implements StreamsProvider, Serializable { +public class RssStreamProvider implements StreamsProvider { private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java index 0c17979..1277553 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java @@ -2,6 +2,7 @@ package org.apache.streams.rss.test; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.streams.core.tasks.StreamsProviderTask; import org.apache.streams.pojo.json.Activity; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.RssStreamConfiguration; @@ -53,7 +54,8 @@ public class Top100FeedsTest{ configuration.setFeeds(feeds); RssStreamProvider provider = new RssStreamProvider(configuration, Activity.class); - provider.start(); + provider.prepare(configuration); + provider.startStream(); try { Thread.sleep(10000); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/streams-provider-rss.iml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/streams-provider-rss.iml b/streams-contrib/streams-provider-rss/streams-provider-rss.iml index 2846a74..032b03a 100644 --- a/streams-contrib/streams-provider-rss/streams-provider-rss.iml +++ b/streams-contrib/streams-provider-rss/streams-provider-rss.iml @@ -1,20 +1,13 @@ <?xml version="1.0" encoding="UTF-8"?> -<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> +<module type="JAVA_MODULE" version="4"> <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false"> <output url="file://$MODULE_DIR$/target/classes" /> <output-test url="file://$MODULE_DIR$/target/test-classes" /> <content url="file://$MODULE_DIR$"> <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> - <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" /> <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" /> - <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" /> - <excludeFolder url="file://$MODULE_DIR$/target/classes" /> - <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" /> - <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" /> - <excludeFolder url="file://$MODULE_DIR$/target/maven-status" /> - <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" /> - <excludeFolder url="file://$MODULE_DIR$/target/test-classes" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> </content> <orderEntry type="inheritedJdk" /> <orderEntry type="sourceFolder" forTests="false" /> @@ -35,9 +28,6 @@ <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" /> <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> <orderEntry type="module" module-name="streams-core" /> - <orderEntry type="module" module-name="streams-util" /> - <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" /> - <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" /> <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" /> <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" /> <orderEntry type="module" module-name="streams-pojo" /> @@ -57,6 +47,8 @@ <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" /> <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" /> <orderEntry type="module" module-name="streams-config" /> + <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" /> + <orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" /> <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" /> <orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" /> <orderEntry type="library" name="Maven: net.minidev:json-smart:1.2" level="project" /> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java index 29c0e60..4a5e3ba 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java @@ -64,7 +64,7 @@ public class SysomosProvider implements StreamsProvider { ScheduledExecutorService service; @Override - public void start() { + public void startStream() { LOGGER.trace("Starting Producer"); if(!started) { LOGGER.trace("Producer not started. Initializing"); @@ -80,27 +80,27 @@ public class SysomosProvider implements StreamsProvider { } @Override - public void stop() { - started = false; + public StreamsResultSet readCurrent() { + return null; } @Override - public Queue<StreamsDatum> getProviderQueue() { - return providerQueue; + public StreamsResultSet readNew(BigInteger bigInteger) { + return null; } @Override - public StreamsResultSet readCurrent() { + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { return null; } @Override - public StreamsResultSet readNew(BigInteger bigInteger) { - return null; + public void prepare(Object configurationObject) { + } @Override - public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { - return null; + public void cleanUp() { + } }
