This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch STREAMS-676
in repository https://gitbox.apache.org/repos/asf/streams.git

commit f47c42c5df3410387aba291dad48a4558869242f
Author: sblackmon <sblack...@apache.org>
AuthorDate: Mon Oct 12 17:33:49 2020 -0500

    resolves STREAMS-676
    
    remove unnecessary keying and use GlobalWindow in 
streams-examples-flink/flink-twitter-collection
---
 .../flink-twitter-collection/pom.xml               | 15 ++++++++++
 .../main/jsonschema/StreamsFlinkConfiguration.json |  4 +--
 .../apache/streams/examples/flink/FlinkBase.scala  | 11 ++++++++
 .../collection/FlinkTwitterFollowingPipeline.scala | 13 +++++----
 .../collection/FlinkTwitterPostsPipeline.scala     | 19 +++++--------
 .../FlinkTwitterUserInformationPipeline.scala      | 32 ++++------------------
 .../FollowingCollectorFlatMapFunction.scala        |  8 +++---
 .../TimelineCollectorFlatMapFunction.scala         |  8 +++---
 8 files changed, 55 insertions(+), 55 deletions(-)

diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml 
b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
index 467536b..dee8dfc 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
@@ -95,6 +95,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-converters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -402,6 +407,16 @@
                 <dependencies>
                     <dependency>
                         <groupId>org.apache.streams</groupId>
+                        <artifactId>streams-provider-twitter</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.streams</groupId>
+                        <artifactId>streams-converters</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.streams</groupId>
                         <artifactId>streams-persist-hdfs</artifactId>
                         <version>${project.version}</version>
                     </dependency>
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
index 7c6291e..b45ec14 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json
@@ -12,10 +12,10 @@
   },
   "additionalProperties": false,
   "properties": {
-    "test": {
+    "local": {
       "type": "boolean"
     },
-    "local": {
+    "test": {
       "type": "boolean"
     }
   }
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 980f5eb..a2123b8 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -29,6 +29,9 @@ import org.apache.flink.streaming.api.CheckpointingMode
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.flink.{FlinkBatchConfiguration, 
FlinkStreamingConfiguration, StreamsFlinkConfiguration}
 import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, 
HdfsWriterConfiguration}
@@ -46,6 +49,14 @@ object FlinkBase {
       input.substring(input.lastIndexOf(':')+1)
     else input
   }
+
+  class idListWindowFunction extends AllWindowFunction[String, List[String], 
GlobalWindow] {
+    override def apply(window: GlobalWindow, input: Iterable[String], out: 
Collector[List[String]]): Unit = {
+      if( input.nonEmpty )
+        out.collect(input.map(id => toProviderId(id)).toList)
+    }
+  }
+
 }
 
 trait FlinkBase {
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 4cda31e..b9ef451 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -29,12 +29,14 @@ import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.AllWindowedStream
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.streams.config.StreamsConfigurator
-import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction
 import 
org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.Follow
@@ -129,13 +131,12 @@ class FlinkTwitterFollowingPipeline(config: 
TwitterFollowingPipelineConfiguratio
 
     val ids: DataStream[String] = env.readTextFile(inPath)
 
-    val keyed_ids: KeyedStream[String, Int] = ids.
-      name("keyed_ids").
-      setParallelism(streamsConfig.getParallelism().toInt).
-      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+    val idWindows: AllWindowedStream[String, GlobalWindow] = 
ids.countWindowAll(100)
+
+    val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new 
idListWindowFunction()).name("idLists")
 
     // these datums contain 'Follow' objects
-    val follows: DataStream[Follow] = keyed_ids.
+    val follows: DataStream[Follow] = idLists.
       flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, 
config.getTwitter, streamsFlinkConfiguration)).
       name("follows")
 
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 6d49a5a..24ee9ce 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -19,39 +19,34 @@
 package org.apache.streams.examples.flink.twitter.collection
 
 import java.util.Objects
-import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.AllWindowedStream
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfigurator
-import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction
 import 
org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.hdfs.HdfsReaderConfiguration
 import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.Tweet
-import org.apache.streams.twitter.provider.TwitterTimelineProvider
 import org.hamcrest.MatcherAssert
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
-import scala.collection.JavaConversions._
-
 /**
   * FlinkTwitterPostsPipeline collects recent posts from all profiles from a
   * set of IDs, writing each post as a twitter:status in json format to dfs.
@@ -138,11 +133,11 @@ class FlinkTwitterPostsPipeline(config: 
TwitterPostsPipelineConfiguration = new
 
     val ids: DataStream[String] = env.readTextFile(inPath).name("ids")
 
-    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").
-      setParallelism(streamsConfig.getParallelism().toInt).
-      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+    val idWindows: AllWindowedStream[String, GlobalWindow] = 
ids.countWindowAll(100)
+
+    val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new 
idListWindowFunction()).name("idLists")
 
-    val tweets: DataStream[Tweet] = keyed_ids.
+    val tweets: DataStream[Tweet] = idLists.
       flatMap(new TimelineCollectorFlatMapFunction(streamsConfig, 
config.getTwitter, streamsFlinkConfiguration)).
       setParallelism(env.getParallelism).
       name("tweets")
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 45f34ff..99d2bf4 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -19,42 +19,33 @@
 package org.apache.streams.examples.flink.twitter.collection
 
 import java.util.Objects
-import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.util.concurrent.Uninterruptibles
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.AllWindowedStream
 import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.scala.WindowedStream
-import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.util.Collector
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.FlinkBase
-import org.apache.streams.examples.flink.FlinkBase.toProviderId
+import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction
 import 
org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.hdfs.HdfsReaderConfiguration
 import org.apache.streams.hdfs.HdfsWriterConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.User
-import org.apache.streams.twitter.provider.TwitterUserInformationProvider
 import org.hamcrest.MatcherAssert
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
-import scala.collection.JavaConversions._
-
 /**
   * FlinkTwitterPostsPipeline collects the current user profile of a
   * set of IDs, writing each as a twitter:user in json format to dfs.
@@ -142,9 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
 
     val ids: DataStream[String] = 
env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids")
 
-    val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id 
=> (id.hashCode % 100).abs )
-
-    val idWindows: WindowedStream[String, Int, GlobalWindow] = 
keyed_ids.countWindow(100)
+    val idWindows: AllWindowedStream[String, GlobalWindow] = 
ids.countWindowAll(100)
 
     val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new 
idListWindowFunction()).name("idLists")
 
@@ -156,19 +145,15 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
         MAPPER.writeValueAsString(user)
       }).name("jsons")
 
-    val keyed_jsons: KeyedStream[String, Int] = jsons.
-      setParallelism(streamsConfig.getParallelism().toInt).
-      keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
-
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new 
SimpleStringEncoder[String]("UTF-8")).
       withRollingPolicy(rollingPolicy).
       withBucketAssigner(basePathBucketAssigner).build();
 
     if( config.getTest == true ) {
-      keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+      jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
     } else {
-      keyed_jsons.addSink(fileSink).name("fileSink")
+      jsons.addSink(fileSink).name("fileSink")
     }
 
     val result: JobExecutionResult = 
env.execute("FlinkTwitterUserInformationPipeline")
@@ -182,11 +167,4 @@ class FlinkTwitterUserInformationPipeline(config: 
TwitterUserInformationPipeline
 
   }
 
-  class idListWindowFunction extends WindowFunction[String, List[String], Int, 
GlobalWindow] {
-    override def apply(key: Int, window: GlobalWindow, input: 
Iterable[String], out: Collector[List[String]]): Unit = {
-      if( input.nonEmpty )
-        out.collect(input.map(id => toProviderId(id)).toList)
-    }
-  }
-
 }
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
index 83d1275..f2d435b 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
@@ -22,7 +22,7 @@ class FollowingCollectorFlatMapFunction(
                                          streamsConfiguration : 
StreamsConfiguration,
                                          twitterConfiguration : 
TwitterFollowingConfiguration = new 
ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
                                          flinkConfiguration : 
StreamsFlinkConfiguration = new 
ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
-                                       ) extends RichFlatMapFunction[String, 
Follow] with Serializable {
+                                       ) extends 
RichFlatMapFunction[List[String], Follow] with Serializable {
 
   var userids : IntCounter = new IntCounter()
   var follows : IntCounter = new IntCounter()
@@ -32,13 +32,13 @@ class FollowingCollectorFlatMapFunction(
     
getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.follows", 
this.follows)
   }
 
-  override def flatMap(input: String, out: Collector[Follow]): Unit = {
+  override def flatMap(input: List[String], out: Collector[Follow]): Unit = {
     userids.add(input.size)
     collectConnections(input, out)
   }
 
-  def collectConnections(id : String, out : Collector[Follow]) = {
-    val conf = 
twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+  def collectConnections(ids : List[String], out : Collector[Follow]) = {
+    val conf = twitterConfiguration.withInfo(ids.map(toProviderId(_)))
     val twitProvider: TwitterFollowingProvider = new 
TwitterFollowingProvider(conf)
     twitProvider.prepare(twitProvider)
     twitProvider.startStream()
diff --git 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
index bbf70a5..672bab1 100644
--- 
a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
+++ 
b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
@@ -24,20 +24,20 @@ class TimelineCollectorFlatMapFunction(
                                         streamsConfiguration : 
StreamsConfiguration,
                                         twitterConfiguration : 
TwitterTimelineProviderConfiguration,
                                         streamsFlinkConfiguration : 
StreamsFlinkConfiguration
-                                      ) extends RichFlatMapFunction[String, 
Tweet] with Serializable {
+                                      ) extends 
RichFlatMapFunction[List[String], Tweet] with Serializable {
   var userids : IntCounter = new IntCounter()
   var posts : IntCounter = new IntCounter()
   override def open(parameters: Configuration): Unit = {
     
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.userids", 
this.userids)
     
getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.posts", 
this.posts)
   }
-  override def flatMap(input: String, out: Collector[Tweet]): Unit = {
+  override def flatMap(input: List[String], out: Collector[Tweet]): Unit = {
     userids.add(input.size)
     collectPosts(input, out)
   }
-  def collectPosts(id : String, out : Collector[Tweet]) = {
+  def collectPosts(ids : List[String], out : Collector[Tweet]) = {
     try {
-      val conf = 
twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterTimelineProviderConfiguration]
+      val conf = twitterConfiguration.withInfo(ids.map(toProviderId(_)))
       val twitProvider: TwitterTimelineProvider = new 
TwitterTimelineProvider(conf)
       twitProvider.prepare(conf)
       twitProvider.startStream()

Reply via email to