all five flink examples passing

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/9dcdf645
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/9dcdf645
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/9dcdf645

Branch: refs/heads/master
Commit: 9dcdf645080302d2f8e1bc7dc3d312817d459cf5
Parents: 58fefc0
Author: Steve Blackmon @steveblackmon <[email protected]>
Authored: Wed Oct 5 16:42:25 2016 -0500
Committer: Steve Blackmon @steveblackmon <[email protected]>
Committed: Wed Oct 5 16:42:25 2016 -0500

----------------------------------------------------------------------
 flink/flink-twitter-collection/pom.xml          | 28 ++++---
 .../FlinkTwitterFollowingPipeline.scala         |  2 +-
 .../collection/FlinkTwitterPostsPipeline.scala  |  2 +-
 .../FlinkTwitterSpritzerPipeline.scala          | 28 +++++--
 .../FlinkTwitterUserInformationPipeline.scala   |  2 +-
 .../markdown/FlinkTwitterSpritzerPipeline.md    |  6 +-
 .../src/site/markdown/index.md                  |  6 +-
 .../resources/FlinkTwitterFollowingPipeline.dot | 37 +++++++++
 .../resources/FlinkTwitterPostsPipeline.dot     | 37 +++++++++
 .../resources/FlinkTwitterSpritzerPipeline.dot  | 33 ++++++++
 .../FlinkTwitterUserInformationPipeline.dot     | 37 +++++++++
 ...linkTwitterFollowingPipelineFollowersIT.conf |  6 +-
 .../FlinkTwitterFollowingPipelineFriendsIT.conf |  5 +-
 .../FlinkTwitterSpritzerPipelineIT.conf         | 15 ++++
 .../FlinkTwitterUserInformationPipelineIT.conf  |  2 +-
 ...inkTwitterFollowingPipelineFollowersIT.scala | 55 +++++++++++++
 ...FlinkTwitterFollowingPipelineFriendsIT.scala | 59 ++++++++++++++
 .../test/FlinkTwitterFollowingPipelineIT.scala  | 86 --------------------
 .../test/FlinkTwitterSpritzerPipelineIT.scala   |  9 +-
 19 files changed, 336 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml 
b/flink/flink-twitter-collection/pom.xml
index 2d35035..4cf0b89 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -448,16 +448,24 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.12.4</version>
-                <executions>
-                    <execution>
-                        <id>integration-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <configuration>
+                    <!-- Run integration test suite rather than individual 
tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*IT.java</include>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-testng</artifactId>
+                        <version>${failsafe.plugin.version}</version>
+                    </dependency>
+                </dependencies>
             </plugin>
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2fd9336..a20078e 100644
--- 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -126,7 +126,7 @@ class FlinkTwitterFollowingPipeline(config: 
TwitterFollowingPipelineConfiguratio
 
         // if( test == true ) jsons.print();
 
-        env.execute("FlinkTwitterFollowingPipeline")
+        env.execute(STREAMS_ID)
     }
 
     class FollowingCollectorFlatMapFunction(

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index beea973..bb7d54c 100644
--- 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -134,7 +134,7 @@ class FlinkTwitterPostsPipeline(config: 
TwitterPostsPipelineConfiguration = new
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class postCollectorFlatMapFunction extends RichFlatMapFunction[String, 
StreamsDatum] with Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index b615806..d6ed3df 100644
--- 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -1,10 +1,12 @@
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.io.Serializable
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.functions.StoppableFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -21,6 +23,7 @@ import org.apache.streams.twitter.TwitterStreamConfiguration
 import org.apache.streams.twitter.provider.TwitterStreamProvider
 import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.api.scala._
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
@@ -82,6 +85,8 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
 
   import FlinkTwitterSpritzerPipeline._
 
+  val spritzerSource = new SpritzerSource(config.getTwitter)
+
   override def run(): Unit = {
 
     val env: StreamExecutionEnvironment = 
streamEnvironment(MAPPER.convertValue(config, 
classOf[FlinkStreamingConfiguration]))
@@ -91,7 +96,7 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val streamSource : DataStream[String] = env.addSource(new 
SpritzerSource(config.getTwitter));
+    val streamSource : DataStream[String] = env.addSource(spritzerSource);
 
     if( config.getTest == false )
       streamSource.addSink(new 
RollingSink[String](outPath)).setParallelism(3).name("hdfs")
@@ -101,15 +106,23 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
 
     // if( test == true ) jsons.print();
 
-    env.execute("FlinkTwitterPostsPipeline")
+    env.execute(STREAMS_ID)
+
+  }
+
+  def stop(): Unit = {
+    spritzerSource.stop()
   }
 
-  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends 
RichSourceFunction[String] with Serializable {
+  class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends 
RichSourceFunction[String] with Serializable with StoppableFunction {
+
+    var mapper: ObjectMapper = _
 
     var twitProvider: TwitterStreamProvider = _
 
     @throws[Exception]
     override def open(parameters: Configuration): Unit = {
+      mapper = 
StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT)
       twitProvider = new TwitterStreamProvider( sourceConfig )
       twitProvider.prepare(twitProvider)
       twitProvider.startStream()
@@ -120,17 +133,16 @@ class FlinkTwitterSpritzerPipeline(config: 
TwitterSpritzerPipelineConfiguration
       do {
         Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, 
TimeUnit.MILLISECONDS)
         iterator = twitProvider.readCurrent().iterator()
-        iterator.toList.map(datum => 
ctx.collect(datum.getDocument.asInstanceOf[String]))
+        iterator.toList.map(datum => 
ctx.collect(mapper.writeValueAsString(datum.getDocument)))
       } while( twitProvider.isRunning )
     }
 
     override def cancel(): Unit = {
-      twitProvider.cleanUp()
+      close()
     }
 
-    @throws[Exception]
-    override def close(): Unit = {
-      twitProvider.cleanUp()
+    override def stop(): Unit = {
+      close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 867255d..ad0315a 100644
--- 
a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ 
b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -128,7 +128,7 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
 
     LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
-    env.execute("FlinkTwitterUserInformationPipeline")
+    env.execute(STREAMS_ID)
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, 
GlobalWindow] {

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
 
b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
index 259fe7f..1e59039 100644
--- 
a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
+++ 
b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md
@@ -24,17 +24,17 @@ Example Configuration:
 Run (Local):
 ------------
 
-    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar 
-Dconfig.file=file://<location_of_config_file> 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
+    java -cp dist/flink-twitter-collection-jar-with-dependencies.jar 
-Dconfig.file=file://<location_of_config_file> 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
 
 Run (Flink):
 ------------
 
-    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline 
http://<location_of_config_file> 
+    flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
 http://<location_of_config_file> 
 
 Run (YARN):
 -----------
 
-    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline 
http://<location_of_config_file> 
+    flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
 http://<location_of_config_file> 
 
 [JavaDocs](apidocs/index.html "JavaDocs")
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md 
b/flink/flink-twitter-collection/src/site/markdown/index.md
index 0f15603..24783be 100644
--- a/flink/flink-twitter-collection/src/site/markdown/index.md
+++ b/flink/flink-twitter-collection/src/site/markdown/index.md
@@ -16,11 +16,13 @@ Collects large batches of documents from api.twitter.com 
from a seed set of ids.
 Streams:
 --------
 
-<a href="FlinkTwitterUserInformationPipeline.html" 
target="_self">FlinkTwitterUserInformationPipeline</a>
+<a href="FlinkTwitterFollowingPipeline.html" 
target="_self">FlinkTwitterFollowingPipeline</a>
 
 <a href="FlinkTwitterPostsPipeline.html" 
target="_self">FlinkTwitterPostsPipeline</a>
 
-<a href="FlinkTwitterFollowingPipeline.html" 
target="_self">FlinkTwitterFollowingPipeline</a>
+<a href="FlinkTwitterSpritzerPipeline.html" 
target="_self">FlinkTwitterSpritzerPipeline</a>
+
+<a href="FlinkTwitterUserInformationPipeline.html" 
target="_self">FlinkTwitterUserInformationPipeline</a>
 
 Test:
 -----

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
new file mode 100644
index 0000000..ba5e60d
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source 
[label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterFollowingProvider 
[label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java";];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination 
[label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterFollowingProvider -> source [dir=back,style=dashed];
+  TwitterFollowingProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
new file mode 100644
index 0000000..1092ff4
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source 
[label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterTimelineProvider 
[label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java";];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination 
[label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterTimelineProvider -> source [dir=back,style=dashed];
+  TwitterTimelineProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
new file mode 100644
index 0000000..5a57595
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //providers
+  TwitterStreamProvider 
[label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java";];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box];
+
+  //stream
+  TwitterStreamProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
new file mode 100644
index 0000000..4a37234
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+  //source
+  source 
[label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab];
+
+  //providers
+  TwitterUserInformationProvider 
[label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java";];
+
+  //persisters
+  RollingFileSink [label="RollingFileSink",shape=ellipse];
+  
+   //data
+  destination 
[label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab];
+
+  //stream
+  TwitterUserInformationProvider -> source [dir=back,style=dashed];
+  TwitterUserInformationProvider -> RollingFileSink [label="String"];
+  RollingFileSink -> destination;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 87057be..3e922ab 100644
--- 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -10,7 +10,11 @@ destination {
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = followers
+  ids_only = true
+  max_items = 5000
+}
 providerWaitMs = 1000
 local = true
 test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index b5212ed..038a8dc 100644
--- 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -10,7 +10,10 @@ destination {
   path = "target/test-classes"
   writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
 }
-twitter.endpoint = friends
+twitter {
+  endpoint = friends
+  ids_only = true
+}
 providerWaitMs = 1000
 local = true
 test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
new file mode 100644
index 0000000..fec4769
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf
@@ -0,0 +1,15 @@
+destination {
+  fields = ["DOC"]
+  scheme = file
+  path = "target/test-classes"
+  writerPath = "FlinkTwitterSpritzerPipelineIT"
+}
+twitter {
+  endpoint = sample
+  track = [
+    "data"
+  ]
+}
+providerWaitMs = 1000
+local = true
+test = true

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index 342a850..d3663fe 100644
--- 
a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ 
b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -2,7 +2,7 @@ source {
   fields = ["ID"]
   scheme = file
   path = "target/test-classes"
-  readerPath = "asf.txt"
+  readerPath = "1000twitterids.txt"
 }
 destination {
   fields = ["DOC"]

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
new file mode 100644
index 0000000..f38ad92
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -0,0 +1,55 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, 
StreamsConfigurator}
+import 
org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+
+  private val LOGGER: Logger = 
LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
+
+  import FlinkTwitterFollowingPipeline._
+
+  @Test
+  def flinkTwitterFollowersPipelineFollowersIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new 
File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new 
ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          > 4000)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
new file mode 100644
index 0000000..464e743
--- /dev/null
+++ 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -0,0 +1,59 @@
+package com.peoplepattern.streams.twitter.collection
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import 
org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, 
StreamsConfigurator}
+import 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import 
org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline,
 FlinkTwitterSpritzerPipeline}
+import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, 
HdfsWriterConfiguration}
+import org.junit.Ignore
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+  * Created by sblackmon on 3/13/16.
+  */
+class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+
+  private val LOGGER: Logger = 
LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
+
+  import FlinkTwitterFollowingPipeline._
+
+  @Test
+  def flinkTwitterFollowersPipelineFriendsIT = {
+
+    val reference: Config = ConfigFactory.load()
+    val conf_file: File = new 
File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+    assert(conf_file.exists())
+    val testResourceConfig: Config = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+    val streams: StreamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe)
+    val testConfig = new 
ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+    setup(testConfig)
+
+    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
+    val jobThread = new Thread(job)
+    jobThread.start
+    jobThread.join
+
+    eventually (timeout(60 seconds), interval(1 seconds)) {
+      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath)))
+      assert(
+        Source.fromFile(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+          > 90)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
deleted file mode 100644
index e6294f6..0000000
--- 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.peoplepattern.streams.twitter.collection
-
-import java.io.File
-import java.nio.file.{Files, Paths}
-
-import 
org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, 
StreamsConfigurator}
-import 
org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import 
org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline,
 FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, 
HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-import org.testng.annotations.Test
-
-import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
-
-/**
-  * Created by sblackmon on 3/13/16.
-  */
-class FlinkTwitterFollowingPipelineIT extends FlatSpec {
-
-  private val LOGGER: Logger = 
LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
-
-  import FlinkTwitterFollowingPipeline._
-
-  @Test(enabled = false)
-  def flinkTwitterFollowersPipelineFriendsIT = {
-
-    val reference: Config = ConfigFactory.load()
-    val conf_file: File = new 
File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
-    assert(conf_file.exists())
-    val testResourceConfig: Config = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
-    val streams: StreamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe)
-    val testConfig = new 
ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
-    setup(testConfig)
-
-    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
-    val jobThread = new Thread(job)
-    jobThread.start
-    jobThread.join
-
-    eventually (timeout(60 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 90)
-    }
-
-  }
-
-  @Test(enabled = false)
-  def flinkTwitterFollowersPipelineFollowersIT = {
-
-    val reference: Config = ConfigFactory.load()
-    val conf_file: File = new 
File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
-    assert(conf_file.exists())
-    val testResourceConfig: Config = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-    val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
-    val streams: StreamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe)
-    val testConfig = new 
ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-
-    setup(testConfig)
-
-    val job = new FlinkTwitterFollowingPipeline(config = testConfig)
-    val jobThread = new Thread(job)
-    jobThread.start
-    jobThread.join
-
-    eventually (timeout(60 seconds), interval(1 seconds)) {
-      assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 500)
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index f083f65..2e2e9b1 100644
--- 
a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ 
b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -26,7 +26,7 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 
   import FlinkTwitterSpritzerPipeline._
 
-  @Test(enabled = false)
+  @Test
   def flinkTwitterSpritzerPipelineIT = {
 
     val reference: Config = ConfigFactory.load()
@@ -43,13 +43,14 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
     val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
     val jobThread = new Thread(job)
     jobThread.start
-    jobThread.join
+    jobThread.join(30000)
+    job.stop()
 
-    eventually (timeout(30 seconds), interval(1 seconds)) {
+    eventually (timeout(60 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath)))
       assert(
         Source.fromFile(testConfig.getDestination.getPath + "/" + 
testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 200)
+          >= 10)
     }
 
   }


Reply via email to