Author: sblackmon
Date: Mon Feb 24 17:29:43 2014
New Revision: 1571362

URL: http://svn.apache.org/r1571362
Log:
fixing STREAMS-26 branch

Added:
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
      - copied, changed from r1571342, 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
Removed:
    incubator/streams/branches/STREAMS-26/trunk/
Modified:
    incubator/streams/branches/STREAMS-26/provision/provision.iml
    incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
    
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
    
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java

Modified: incubator/streams/branches/STREAMS-26/provision/provision.iml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/provision/provision.iml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/provision/provision.iml (original)
+++ incubator/streams/branches/STREAMS-26/provision/provision.iml Mon Feb 24 
17:29:43 2014
@@ -1,13 +1,18 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module 
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" 
type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" inherit-compiler-output="false">
-    <output 
url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes"
 />
-    <output-test 
url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes"
 />
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" 
inherit-compiler-output="false">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
     <content url="file://$MODULE_DIR$">
       <excludeFolder url="file://$MODULE_DIR$/target" />
     </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" 
level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: 
org.hamcrest:hamcrest-core:1.3" level="project" />
+    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" 
level="project" />
+    <orderEntry type="library" name="Maven: 
ch.qos.logback:logback-classic:1.0.9" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" 
level="project" />
   </component>
   <component name="POM File Configuration" pomFile="" />
 </module>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Mon Feb 24 
17:29:43 2014
@@ -38,17 +38,15 @@
     <modules>
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
-        <module>streams-persist-hbase</module>
-        <module>streams-persist-hdfs</module>
-        <module>streams-persist-kafka</module>
-        <module>streams-persist-mongo</module>
-        <module>streams-provider-datasift</module>
-        <module>streams-provider-facebook</module>
-        <module>streams-provider-gnip</module>
-        <module>streams-provider-moreover</module>
+        <!--<module>streams-persist-hdfs</module>-->
+        <!--<module>streams-persist-kafka</module>-->
+        <!--<module>streams-provider-datasift</module>-->
+        <!--<module>streams-provider-facebook</module>-->
+        <!--<module>streams-provider-gnip</module>-->
+        <!--<module>streams-provider-moreover</module>-->
         <module>streams-provider-twitter</module>
-        <module>streams-provider-sysomos</module>
-        <module>streams-provider-rss</module>
+        <!--<module>streams-provider-sysomos</module>-->
+        <!--<module>streams-provider-rss</module>-->
         <!--<module>streams-proxy-semantria</module>-->
     </modules>
 

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
 Mon Feb 24 17:29:43 2014
@@ -12,22 +12,26 @@ import org.slf4j.LoggerFactory;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class ConsolePersistWriter extends StreamsPersistWriterTask implements 
StreamsPersistWriter  {
+public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsolePersistWriter.class);
 
+    protected volatile Queue<StreamsDatum> persistQueue;
+
     private ObjectMapper mapper = new ObjectMapper();
 
-    public ConsolePersistWriter(StreamsPersistWriter writer) {
-        super(writer);
+    public ConsolePersistWriter() {
+        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
     }
 
-    @Override
     public void prepare(Object o) {
-        Preconditions.checkNotNull(this.getInputQueues());
+
     }
 
-    @Override
     public void cleanUp() {
 
     }
@@ -47,4 +51,10 @@ public class ConsolePersistWriter extend
 
     }
 
+    @Override
+    public void run() {
+        Preconditions.checkNotNull(persistQueue);
+        new Thread(new ConsolePersistWriterTask(this)).start();
+    }
+
 }

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 Mon Feb 24 17:29:43 2014
@@ -168,11 +168,6 @@ public class ElasticsearchPersistReader 
         }
     }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("PersistReader done");
-    }
-
     public void setWithfields(String[] withfields) {
         this.withfields = withfields;
     }
@@ -311,19 +306,48 @@ public class ElasticsearchPersistReader 
             item.getMetadata().put("type", hit.getType());
             currentQueue.add(item);
         }
+
+        cleanUp();
+
         return (StreamsResultSet)currentQueue;
     }
 
+    public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+
     @Override
     public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
+        return readCurrent();
     }
 
     @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
+        return readCurrent();
     }
 
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("PersistReader done");
+    }
 }
 
 

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 Mon Feb 24 17:29:43 2014
@@ -124,14 +124,29 @@ public class ElasticsearchPersistWriter 
     public boolean isConnected()                               { return 
(client != null); }
 
     @Override
-    public void prepare(Object o) {
+    public void write(StreamsDatum streamsDatum) {
+
+        String json;
+        try {
+
+            json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+            add(index, type, null, json);
+
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+        }
+    }
+
+    public void start() {
+
         manager = new ElasticsearchClientManager(config);
         client = manager.getClient();
 
         LOGGER.info(client.toString());
     }
 
-    @Override
     public void cleanUp() {
 
         try {
@@ -143,22 +158,6 @@ public class ElasticsearchPersistWriter 
     }
 
     @Override
-    public void write(StreamsDatum streamsDatum) {
-
-        String json;
-        try {
-
-            json = mapper.writeValueAsString(streamsDatum.getDocument());
-
-            add(index, type, null, json);
-
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
-
-        }
-    }
-
-    @Override
     public void close()
     {
         try
@@ -527,4 +526,9 @@ public class ElasticsearchPersistWriter 
         return toReturn;
     }
 
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
+
 }

Copied: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
 (from r1571342, 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java)
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java?p2=incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java&p1=incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java&r1=1571342&r2=1571362&rev=1571362&view=diff
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
 Mon Feb 24 17:29:43 2014
@@ -20,7 +20,7 @@ public class ElasticsearchPersistWriterT
     public void run() {
 
         while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
+            if( writer.persistQueue.peek() != null ) {
                 try {
                     StreamsDatum entry = writer.persistQueue.remove();
                     writer.write(entry);

Modified: 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
 Mon Feb 24 17:29:43 2014
@@ -1,10 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<module 
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" 
type="JAVA_MODULE" version="4">
+<module type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" 
inherit-compiler-output="false">
     <output url="file://$MODULE_DIR$/target/classes" />
     <output-test url="file://$MODULE_DIR$/target/test-classes" />
     <content url="file://$MODULE_DIR$">
-      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jaxb2" 
isTestSource="false" generated="true" />
       <sourceFolder 
url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" 
isTestSource="false" generated="true" />
       <sourceFolder url="file://$MODULE_DIR$/src/main/java" 
isTestSource="false" />
       <sourceFolder url="file://$MODULE_DIR$/src/test/java" 
isTestSource="true" />
@@ -30,10 +29,10 @@
     <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" 
level="project" />
     <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" 
level="project" />
     <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" 
level="project" />
-    <orderEntry type="library" name="Maven: 
org.apache.streams:streams-core:0.1-SNAPSHOT" level="project" />
+    <orderEntry type="module" module-name="streams-core" />
     <orderEntry type="library" name="Maven: 
ch.qos.logback:logback-classic:1.0.9" level="project" />
     <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" 
level="project" />
-    <orderEntry type="library" name="Maven: 
org.apache.streams:streams-pojo:0.1-SNAPSHOT" level="project" />
+    <orderEntry type="module" module-name="streams-pojo" />
     <orderEntry type="library" name="Maven: 
com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" 
/>
     <orderEntry type="library" name="Maven: 
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" 
level="project" />
     <orderEntry type="library" name="Maven: 
org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
@@ -49,15 +48,15 @@
     <orderEntry type="library" name="Maven: 
com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
     <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" 
level="project" />
     <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" 
level="project" />
-    <orderEntry type="library" name="Maven: 
org.apache.streams:streams-config:0.1-SNAPSHOT" level="project" />
-    <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" 
level="project" />
+    <orderEntry type="module" module-name="streams-config" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" 
level="project" />
     <orderEntry type="library" name="Maven: 
com.google.collections:google-collections:1.0" level="project" />
     <orderEntry type="library" name="Maven: 
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" 
/>
     <orderEntry type="library" name="Maven: 
com.jayway.jsonpath:json-path:0.9.0" level="project" />
     <orderEntry type="library" name="Maven: net.minidev:json-smart:1.2" 
level="project" />
     <orderEntry type="library" name="Maven: 
com.jayway.jsonpath:json-path-assert:0.9.0" level="project" />
     <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-library:1.3" 
level="project" />
-    <orderEntry type="module" module-name="streams-persist-console (2)" 
scope="TEST" />
+    <orderEntry type="module" module-name="streams-persist-console" 
scope="TEST" />
     <orderEntry type="library" name="Maven: rome:rome:1.0" level="project" />
     <orderEntry type="library" name="Maven: jdom:jdom:1.0" level="project" />
   </component>

Modified: 
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- 
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
 (original)
+++ 
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
 Mon Feb 24 17:29:43 2014
@@ -32,15 +32,9 @@ import java.util.Queue;
  */
 public interface StreamsPersistReader extends StreamsProvider {
 
-//    void start();
-//    void stop();
-//
-//    public void setPersistQueue(Queue<StreamsDatum> persistQueue);
-//    public Queue<StreamsDatum> getPersistQueue();
-
-//    public StreamsResultSet readAll();
-//    public StreamsResultSet readNew(BigInteger sequence);
-//    public StreamsResultSet readRange(DateTime start, DateTime end);
+    public StreamsResultSet readAll();
+    public StreamsResultSet readNew(BigInteger sequence);
+    public StreamsResultSet readRange(DateTime start, DateTime end);
 
 
 }


Reply via email to