Author: sblackmon
Date: Mon Feb 24 17:29:43 2014
New Revision: 1571362
URL: http://svn.apache.org/r1571362
Log:
fixing STREAMS-26 branch
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
- copied, changed from r1571342,
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
Removed:
incubator/streams/branches/STREAMS-26/trunk/
Modified:
incubator/streams/branches/STREAMS-26/provision/provision.iml
incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
Modified: incubator/streams/branches/STREAMS-26/provision/provision.iml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/provision/provision.iml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/provision/provision.iml (original)
+++ incubator/streams/branches/STREAMS-26/provision/provision.iml Mon Feb 24
17:29:43 2014
@@ -1,13 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true"
type="JAVA_MODULE" version="4">
- <component name="NewModuleRootManager" inherit-compiler-output="false">
- <output
url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes"
/>
- <output-test
url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes"
/>
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6"
inherit-compiler-output="false">
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11"
level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven:
org.hamcrest:hamcrest-core:1.3" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6"
level="project" />
+ <orderEntry type="library" name="Maven:
ch.qos.logback:logback-classic:1.0.9" level="project" />
+ <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9"
level="project" />
</component>
<component name="POM File Configuration" pomFile="" />
</module>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Mon Feb 24
17:29:43 2014
@@ -38,17 +38,15 @@
<modules>
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
- <module>streams-persist-hbase</module>
- <module>streams-persist-hdfs</module>
- <module>streams-persist-kafka</module>
- <module>streams-persist-mongo</module>
- <module>streams-provider-datasift</module>
- <module>streams-provider-facebook</module>
- <module>streams-provider-gnip</module>
- <module>streams-provider-moreover</module>
+ <!--<module>streams-persist-hdfs</module>-->
+ <!--<module>streams-persist-kafka</module>-->
+ <!--<module>streams-provider-datasift</module>-->
+ <!--<module>streams-provider-facebook</module>-->
+ <!--<module>streams-provider-gnip</module>-->
+ <!--<module>streams-provider-moreover</module>-->
<module>streams-provider-twitter</module>
- <module>streams-provider-sysomos</module>
- <module>streams-provider-rss</module>
+ <!--<module>streams-provider-sysomos</module>-->
+ <!--<module>streams-provider-rss</module>-->
<!--<module>streams-proxy-semantria</module>-->
</modules>
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
Mon Feb 24 17:29:43 2014
@@ -12,22 +12,26 @@ import org.slf4j.LoggerFactory;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class ConsolePersistWriter extends StreamsPersistWriterTask implements
StreamsPersistWriter {
+public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsolePersistWriter.class);
+ protected volatile Queue<StreamsDatum> persistQueue;
+
private ObjectMapper mapper = new ObjectMapper();
- public ConsolePersistWriter(StreamsPersistWriter writer) {
- super(writer);
+ public ConsolePersistWriter() {
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
}
- @Override
public void prepare(Object o) {
- Preconditions.checkNotNull(this.getInputQueues());
+
}
- @Override
public void cleanUp() {
}
@@ -47,4 +51,10 @@ public class ConsolePersistWriter extend
}
+ @Override
+ public void run() {
+ Preconditions.checkNotNull(persistQueue);
+ new Thread(new ConsolePersistWriterTask(this)).start();
+ }
+
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
Mon Feb 24 17:29:43 2014
@@ -168,11 +168,6 @@ public class ElasticsearchPersistReader
}
}
- @Override
- public void cleanUp() {
- LOGGER.info("PersistReader done");
- }
-
public void setWithfields(String[] withfields) {
this.withfields = withfields;
}
@@ -311,19 +306,48 @@ public class ElasticsearchPersistReader
item.getMetadata().put("type", hit.getType());
currentQueue.add(item);
}
+
+ cleanUp();
+
return (StreamsResultSet)currentQueue;
}
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
@Override
public StreamsResultSet readNew(BigInteger sequence) {
- return null;
+ return readCurrent();
}
@Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
+ return readCurrent();
}
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("PersistReader done");
+ }
}
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
Mon Feb 24 17:29:43 2014
@@ -124,14 +124,29 @@ public class ElasticsearchPersistWriter
public boolean isConnected() { return
(client != null); }
@Override
- public void prepare(Object o) {
+ public void write(StreamsDatum streamsDatum) {
+
+ String json;
+ try {
+
+ json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+ add(index, type, null, json);
+
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+ }
+ }
+
+ public void start() {
+
manager = new ElasticsearchClientManager(config);
client = manager.getClient();
LOGGER.info(client.toString());
}
- @Override
public void cleanUp() {
try {
@@ -143,22 +158,6 @@ public class ElasticsearchPersistWriter
}
@Override
- public void write(StreamsDatum streamsDatum) {
-
- String json;
- try {
-
- json = mapper.writeValueAsString(streamsDatum.getDocument());
-
- add(index, type, null, json);
-
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
-
- }
- }
-
- @Override
public void close()
{
try
@@ -527,4 +526,9 @@ public class ElasticsearchPersistWriter
return toReturn;
}
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+
}
Copied:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
(from r1571342,
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java)
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java?p2=incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java&p1=incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java&r1=1571342&r2=1571362&rev=1571362&view=diff
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
Mon Feb 24 17:29:43 2014
@@ -20,7 +20,7 @@ public class ElasticsearchPersistWriterT
public void run() {
while(true) {
- if( writer.getPersistQueue().peek() != null ) {
+ if( writer.persistQueue.peek() != null ) {
try {
StreamsDatum entry = writer.persistQueue.remove();
writer.write(entry);
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
(original)
+++
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-rss/streams-provider-rss.iml
Mon Feb 24 17:29:43 2014
@@ -1,10 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
-<module
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true"
type="JAVA_MODULE" version="4">
+<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6"
inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
- <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jaxb2"
isTestSource="false" generated="true" />
<sourceFolder
url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo"
isTestSource="false" generated="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java"
isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java"
isTestSource="true" />
@@ -30,10 +29,10 @@
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6"
level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11"
level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3"
level="project" />
- <orderEntry type="library" name="Maven:
org.apache.streams:streams-core:0.1-SNAPSHOT" level="project" />
+ <orderEntry type="module" module-name="streams-core" />
<orderEntry type="library" name="Maven:
ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9"
level="project" />
- <orderEntry type="library" name="Maven:
org.apache.streams:streams-pojo:0.1-SNAPSHOT" level="project" />
+ <orderEntry type="module" module-name="streams-pojo" />
<orderEntry type="library" name="Maven:
com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project"
/>
<orderEntry type="library" name="Maven:
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1"
level="project" />
<orderEntry type="library" name="Maven:
org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
@@ -49,15 +48,15 @@
<orderEntry type="library" name="Maven:
com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0"
level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4"
level="project" />
- <orderEntry type="library" name="Maven:
org.apache.streams:streams-config:0.1-SNAPSHOT" level="project" />
- <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1"
level="project" />
+ <orderEntry type="module" module-name="streams-config" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:15.0"
level="project" />
<orderEntry type="library" name="Maven:
com.google.collections:google-collections:1.0" level="project" />
<orderEntry type="library" name="Maven:
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project"
/>
<orderEntry type="library" name="Maven:
com.jayway.jsonpath:json-path:0.9.0" level="project" />
<orderEntry type="library" name="Maven: net.minidev:json-smart:1.2"
level="project" />
<orderEntry type="library" name="Maven:
com.jayway.jsonpath:json-path-assert:0.9.0" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-library:1.3"
level="project" />
- <orderEntry type="module" module-name="streams-persist-console (2)"
scope="TEST" />
+ <orderEntry type="module" module-name="streams-persist-console"
scope="TEST" />
<orderEntry type="library" name="Maven: rome:rome:1.0" level="project" />
<orderEntry type="library" name="Maven: jdom:jdom:1.0" level="project" />
</component>
Modified:
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
URL:
http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java?rev=1571362&r1=1571361&r2=1571362&view=diff
==============================================================================
---
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
(original)
+++
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
Mon Feb 24 17:29:43 2014
@@ -32,15 +32,9 @@ import java.util.Queue;
*/
public interface StreamsPersistReader extends StreamsProvider {
-// void start();
-// void stop();
-//
-// public void setPersistQueue(Queue<StreamsDatum> persistQueue);
-// public Queue<StreamsDatum> getPersistQueue();
-
-// public StreamsResultSet readAll();
-// public StreamsResultSet readNew(BigInteger sequence);
-// public StreamsResultSet readRange(DateTime start, DateTime end);
+ public StreamsResultSet readAll();
+ public StreamsResultSet readNew(BigInteger sequence);
+ public StreamsResultSet readRange(DateTime start, DateTime end);
}