This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch STREAMS-676 in repository https://gitbox.apache.org/repos/asf/streams.git
commit f47c42c5df3410387aba291dad48a4558869242f Author: sblackmon <sblack...@apache.org> AuthorDate: Mon Oct 12 17:33:49 2020 -0500 resolves STREAMS-676 remove unnecessary keying and use GlobalWindow in streams-examples-flink/flink-twitter-collection --- .../flink-twitter-collection/pom.xml | 15 ++++++++++ .../main/jsonschema/StreamsFlinkConfiguration.json | 4 +-- .../apache/streams/examples/flink/FlinkBase.scala | 11 ++++++++ .../collection/FlinkTwitterFollowingPipeline.scala | 13 +++++---- .../collection/FlinkTwitterPostsPipeline.scala | 19 +++++-------- .../FlinkTwitterUserInformationPipeline.scala | 32 ++++------------------ .../FollowingCollectorFlatMapFunction.scala | 8 +++--- .../TimelineCollectorFlatMapFunction.scala | 8 +++--- 8 files changed, 55 insertions(+), 55 deletions(-) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml index 467536b..dee8dfc 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml @@ -95,6 +95,11 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> + <artifactId>streams-converters</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> <artifactId>streams-provider-twitter</artifactId> <version>${project.version}</version> </dependency> @@ -402,6 +407,16 @@ <dependencies> <dependency> <groupId>org.apache.streams</groupId> + <artifactId>streams-provider-twitter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-converters</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> <artifactId>streams-persist-hdfs</artifactId> <version>${project.version}</version> </dependency> diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json index 7c6291e..b45ec14 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json @@ -12,10 +12,10 @@ }, "additionalProperties": false, "properties": { - "test": { + "local": { "type": "boolean" }, - "local": { + "test": { "type": "boolean" } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala index 980f5eb..a2123b8 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -29,6 +29,9 @@ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.util.Collector import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration} import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} @@ -46,6 +49,14 @@ object FlinkBase { input.substring(input.lastIndexOf(':')+1) else input } + + class idListWindowFunction extends AllWindowFunction[String, List[String], GlobalWindow] { + override def apply(window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = { + if( input.nonEmpty ) + out.collect(input.map(id => toProviderId(id)).toList) + } + } + } trait FlinkBase { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index 4cda31e..b9ef451 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -29,12 +29,14 @@ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.AllWindowedStream import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.scala.KeyedStream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.streams.config.StreamsConfigurator -import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.pojo.Follow @@ -129,13 +131,12 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio val ids: DataStream[String] = env.readTextFile(inPath) - val keyed_ids: KeyedStream[String, Int] = ids. - name("keyed_ids"). - setParallelism(streamsConfig.getParallelism().toInt). - keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) + val idWindows: AllWindowedStream[String, GlobalWindow] = ids.countWindowAll(100) + + val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") // these datums contain 'Follow' objects - val follows: DataStream[Follow] = keyed_ids. + val follows: DataStream[Follow] = idLists. flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)). name("follows") diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index 6d49a5a..24ee9ce 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -19,39 +19,34 @@ package org.apache.streams.examples.flink.twitter.collection import java.util.Objects -import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper -import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobExecutionResult -import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.AllWindowedStream import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.scala.KeyedStream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.streams.config.ComponentConfigurator import org.apache.streams.config.StreamsConfigurator -import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration import org.apache.streams.hdfs.HdfsReaderConfiguration import org.apache.streams.hdfs.HdfsWriterConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.pojo.Tweet -import org.apache.streams.twitter.provider.TwitterTimelineProvider import org.hamcrest.MatcherAssert import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ - /** * FlinkTwitterPostsPipeline collects recent posts from all profiles from a * set of IDs, writing each post as a twitter:status in json format to dfs. @@ -138,11 +133,11 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val ids: DataStream[String] = env.readTextFile(inPath).name("ids") - val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids"). - setParallelism(streamsConfig.getParallelism().toInt). - keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) + val idWindows: AllWindowedStream[String, GlobalWindow] = ids.countWindowAll(100) + + val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") - val tweets: DataStream[Tweet] = keyed_ids. + val tweets: DataStream[Tweet] = idLists. flatMap(new TimelineCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)). setParallelism(env.getParallelism). name("tweets") diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index 45f34ff..99d2bf4 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -19,42 +19,33 @@ package org.apache.streams.examples.flink.twitter.collection import java.util.Objects -import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper -import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobExecutionResult -import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.AllWindowedStream import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.streaming.api.scala.KeyedStream import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.api.scala.WindowedStream -import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.GlobalWindow -import org.apache.flink.util.Collector import org.apache.streams.config.ComponentConfigurator import org.apache.streams.config.StreamsConfigurator import org.apache.streams.examples.flink.FlinkBase -import org.apache.streams.examples.flink.FlinkBase.toProviderId +import org.apache.streams.examples.flink.FlinkBase.idListWindowFunction import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration import org.apache.streams.hdfs.HdfsReaderConfiguration import org.apache.streams.hdfs.HdfsWriterConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.pojo.User -import org.apache.streams.twitter.provider.TwitterUserInformationProvider import org.hamcrest.MatcherAssert import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ - /** * FlinkTwitterPostsPipeline collects the current user profile of a * set of IDs, writing each as a twitter:user in json format to dfs. @@ -142,9 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids") - val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) - - val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100) + val idWindows: AllWindowedStream[String, GlobalWindow] = ids.countWindowAll(100) val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") @@ -156,19 +145,15 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline MAPPER.writeValueAsString(user) }).name("jsons") - val keyed_jsons: KeyedStream[String, Int] = jsons. - setParallelism(streamsConfig.getParallelism().toInt). - keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) - val fileSink : StreamingFileSink[String] = StreamingFileSink. forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). withRollingPolicy(rollingPolicy). withBucketAssigner(basePathBucketAssigner).build(); if( config.getTest == true ) { - keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) } else { - keyed_jsons.addSink(fileSink).name("fileSink") + jsons.addSink(fileSink).name("fileSink") } val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline") @@ -182,11 +167,4 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline } - class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { - override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = { - if( input.nonEmpty ) - out.collect(input.map(id => toProviderId(id)).toList) - } - } - } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala index 83d1275..f2d435b 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala @@ -22,7 +22,7 @@ class FollowingCollectorFlatMapFunction( streamsConfiguration : StreamsConfiguration, twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(), flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration() - ) extends RichFlatMapFunction[String, Follow] with Serializable { + ) extends RichFlatMapFunction[List[String], Follow] with Serializable { var userids : IntCounter = new IntCounter() var follows : IntCounter = new IntCounter() @@ -32,13 +32,13 @@ class FollowingCollectorFlatMapFunction( getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.follows", this.follows) } - override def flatMap(input: String, out: Collector[Follow]): Unit = { + override def flatMap(input: List[String], out: Collector[Follow]): Unit = { userids.add(input.size) collectConnections(input, out) } - def collectConnections(id : String, out : Collector[Follow]) = { - val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration] + def collectConnections(ids : List[String], out : Collector[Follow]) = { + val conf = twitterConfiguration.withInfo(ids.map(toProviderId(_))) val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf) twitProvider.prepare(twitProvider) twitProvider.startStream() diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala index bbf70a5..672bab1 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala @@ -24,20 +24,20 @@ class TimelineCollectorFlatMapFunction( streamsConfiguration : StreamsConfiguration, twitterConfiguration : TwitterTimelineProviderConfiguration, streamsFlinkConfiguration : StreamsFlinkConfiguration - ) extends RichFlatMapFunction[String, Tweet] with Serializable { + ) extends RichFlatMapFunction[List[String], Tweet] with Serializable { var userids : IntCounter = new IntCounter() var posts : IntCounter = new IntCounter() override def open(parameters: Configuration): Unit = { getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.userids", this.userids) getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.posts", this.posts) } - override def flatMap(input: String, out: Collector[Tweet]): Unit = { + override def flatMap(input: List[String], out: Collector[Tweet]): Unit = { userids.add(input.size) collectPosts(input, out) } - def collectPosts(id : String, out : Collector[Tweet]) = { + def collectPosts(ids : List[String], out : Collector[Tweet]) = { try { - val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterTimelineProviderConfiguration] + val conf = twitterConfiguration.withInfo(ids.map(toProviderId(_))) val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf) twitProvider.prepare(conf) twitProvider.startStream()