alpinegizmo commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698872951
##########
File path:
hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
##########
@@ -18,36 +18,47 @@
package org.apache.flink.training.exercises.hourlytips.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
/**
- * The "Hourly Tips" exercise of the Flink training in the docs.
+ * The Hourly Tips exercise from the Flink training.
*
- * The task of the exercise is to first calculate the total tips collected by
each driver, hour by hour, and
- * then from that stream, find the highest tip total in each hour.
+ * The task of the exercise is to first calculate the total tips collected by
each driver,
+ * hour by hour, and then from that stream, find the highest tip total in
each hour.
*
*/
object HourlyTipsExercise {
- def main(args: Array[String]) {
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job = new HourlyTipsJob(new TaxiFareGenerator, new PrintSinkFunction)
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ job.execute()
+ }
- // start the data generator
- val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+ class HourlyTipsJob(source: SourceFunction[TaxiFare], sink:
SinkFunction[(Long, Long, Float)]) {
Review comment:
yes, done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]