Trivial Fix, to trigger github mirroring resync
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/8d1c9fad Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/8d1c9fad Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/8d1c9fad Branch: refs/heads/master Commit: 8d1c9fad609125ce8cd34ab164e2de073562ff5f Parents: 64f8d6e Author: smarthi <[email protected]> Authored: Wed Oct 26 21:33:26 2016 -0400 Committer: smarthi <[email protected]> Committed: Wed Oct 26 21:33:26 2016 -0400 ---------------------------------------------------------------------- .../streams/examples/flink/FlinkBase.scala | 32 ++++++++++---------- .../FlinkTwitterFollowingPipeline.scala | 14 +++------ .../collection/FlinkTwitterPostsPipeline.scala | 11 +++---- .../FlinkTwitterSpritzerPipeline.scala | 15 ++++----- .../FlinkTwitterUserInformationPipeline.scala | 14 ++++----- 5 files changed, 36 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala index a94dd61..49ca5b9 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -50,7 +50,7 @@ trait FlinkBase { */ def main(args: Array[String]): Unit = { // if only one argument, use it as the config URL - if( args.size > 0 ) { + if( args.length > 0 ) { BASELOGGER.info("Args: {}", args) configUrl = args(0) setup(configUrl) @@ -79,7 +79,7 @@ trait FlinkBase { typesafe = StreamsConfigurator.getConfig } - return setup(typesafe) + setup(typesafe) } @@ -91,13 +91,13 @@ trait FlinkBase { if( this.typesafe.getString("mode").equals("streaming")) { val streamingConfiguration: FlinkStreamingConfiguration = new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe) - return setupStreaming(streamingConfiguration) + setupStreaming(streamingConfiguration) } else if( this.typesafe.getString("mode").equals("batch")) { val batchConfiguration: FlinkBatchConfiguration = new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe) - return setupBatch(batchConfiguration) + setupBatch(batchConfiguration) } else { - return false; + false } } @@ -123,7 +123,7 @@ trait FlinkBase { if( streamExecutionEnvironment == null ) streamExecutionEnvironment = streamEnvironment(streamingConfiguration) - return false + false } @@ -138,17 +138,17 @@ trait FlinkBase { if( executionEnvironment == null ) executionEnvironment = batchEnvironment(batchConfiguration) - return true + true } def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = { if (config.getTest == false && config.getLocal == false) { val env = ExecutionEnvironment.getExecutionEnvironment - return env + env } else { val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt) - return env + env } } @@ -156,7 +156,7 @@ trait FlinkBase { if( config.getTest == false && config.getLocal == false) { val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setRestartStrategy(RestartStrategies.noRestart()); + env.setRestartStrategy(RestartStrategies.noRestart()) // start a checkpoint every hour env.enableCheckpointing(config.getCheckpointIntervalMs) @@ -169,10 +169,10 @@ trait FlinkBase { // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) - return env + env } - else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt) + else StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt) } def buildReaderPath(configObject: HdfsReaderConfiguration) : String = { @@ -188,7 +188,7 @@ trait FlinkBase { } else { throw new Exception("scheme not recognized: " + configObject.getScheme) } - return inPathBuilder + inPathBuilder } def buildWriterPath(configObject: HdfsWriterConfiguration) : String = { @@ -204,15 +204,15 @@ trait FlinkBase { } else { throw new Exception("output scheme not recognized: " + configObject.getScheme) } - return outPathBuilder + outPathBuilder } def toProviderId(input : String) : String = { if( input.startsWith("@") ) return input.substring(1) if( input.contains(':')) - return input.substring(input.lastIndexOf(':')+1) - else return input + input.substring(input.lastIndexOf(':')+1) + else input } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index 0e77ef8..a5a4f72 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -43,12 +43,6 @@ import org.apache.flink.api.scala._ import scala.collection.JavaConversions._ -/** - * Created by sblackmon on 4/20/16. - */ -/** - * Created by sblackmon on 3/15/16. - */ object FlinkTwitterFollowingPipeline extends FlinkBase { val STREAMS_ID: String = "FlinkTwitterFollowingPipeline" @@ -59,7 +53,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase { override def main(args: Array[String]) = { super.main(args) val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) - if( setup(jobConfig) == false ) System.exit(1) + if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig) val thread = new Thread(pipeline) thread.start() @@ -100,7 +94,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) - return true + true } @@ -114,7 +108,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) val inPath = buildReaderPath(config.getSource) @@ -140,7 +134,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio jsons.addSink(new RollingSink[String](outPath)).setParallelism(3) else jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism); + .setParallelism(env.getParallelism) // if( test == true ) jsons.print(); http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index 6a070d0..8bb2997 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -52,9 +52,6 @@ import org.apache.flink.api.scala._ import scala.collection.JavaConversions._ -/** - * Created by sblackmon on 7/29/15. - */ object FlinkTwitterPostsPipeline extends FlinkBase { val STREAMS_ID: String = "FlinkTwitterPostsPipeline" @@ -65,7 +62,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase { override def main(args: Array[String]) = { super.main(args) val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe) - if( setup(jobConfig) == false ) System.exit(1) + if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig) val thread = new Thread(pipeline) thread.start() @@ -106,7 +103,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) - return true + true } @@ -120,7 +117,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) val inPath = buildReaderPath(config.getSource) @@ -148,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") else jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism); + .setParallelism(env.getParallelism) // if( test == true ) jsons.print(); http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala index 7e7cc5c..56d892b 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -45,9 +45,6 @@ import org.apache.streams.twitter.converter.TwitterDateTimeFormat import scala.collection.JavaConversions._ -/** - * Created by sblackmon on 7/29/15. - */ object FlinkTwitterSpritzerPipeline extends FlinkBase { val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline" @@ -58,7 +55,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase { override def main(args: Array[String]) = { super.main(args) val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe) - if( setup(jobConfig) == false ) System.exit(1) + if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig) val thread = new Thread(pipeline) thread.start() @@ -93,7 +90,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) - return true + true } @@ -109,18 +106,18 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) val outPath = buildWriterPath(config.getDestination) - val streamSource : DataStream[String] = env.addSource(spritzerSource); + val streamSource : DataStream[String] = env.addSource(spritzerSource) if( config.getTest == false ) streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") else streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism); + .setParallelism(env.getParallelism) // if( test == true ) jsons.print(); @@ -160,7 +157,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration } override def stop(): Unit = { - close(); + close() } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index 0ff8648..01425f6 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -45,9 +45,6 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConversions._ -/** - * Created by sblackmon on 3/15/16. - */ object FlinkTwitterUserInformationPipeline extends FlinkBase { val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline" @@ -58,7 +55,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase { override def main(args: Array[String]) = { super.main(args) val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe) - if( setup(jobConfig) == false ) System.exit(1) + if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig) val thread = new Thread(pipeline) thread.start() @@ -99,7 +96,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) - return true + true } @@ -113,7 +110,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) val inPath = buildReaderPath(config.getSource) @@ -142,7 +139,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") else jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism); + .setParallelism(env.getParallelism) LOGGER.info("StreamExecutionEnvironment: {}", env.toString ) @@ -150,7 +147,8 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline } class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { - override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 ) + override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = { + if( input.nonEmpty ) out.collect(input.map(id => toProviderId(id)).toList) } }
