Repository: flink Updated Branches: refs/heads/master 06c2c35a2 -> c28d96cd6
[FLINK-1344] [streaming] [scala] Added implicits from scala seq to datastream and static StreamExecutionEnvironment initialization Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad45bf12 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad45bf12 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad45bf12 Branch: refs/heads/master Commit: ad45bf1248ff4634490739d0ec7740e579ab9795 Parents: 06c2c35 Author: Paris Carbone <[email protected]> Authored: Fri Jan 23 17:23:46 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Wed Jan 28 11:25:40 2015 +0100 ---------------------------------------------------------------------- .../environment/StreamExecutionEnvironment.java | 25 +++++++++++++------- .../examples/windowing/TopSpeedWindowing.scala | 10 ++++---- .../flink/streaming/api/scala/package.scala | 10 +++++--- 3 files changed, 27 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 352e0fd..2c10017 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -66,6 +66,8 @@ public abstract class StreamExecutionEnvironment { private ExecutionConfig config = new ExecutionConfig(); + private static StreamExecutionEnvironment currentEnvironment; + protected StreamGraph streamGraph; // -------------------------------------------------------------------------------------------- @@ -459,16 +461,20 @@ public abstract class StreamExecutionEnvironment { * executed. */ public static StreamExecutionEnvironment getExecutionEnvironment() { + if (currentEnvironment != null) { + return currentEnvironment; + } ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { ContextEnvironment ctx = (ContextEnvironment) env; - return createContextEnvironment(ctx.getClient(), ctx.getJars(), + currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getDegreeOfParallelism()); } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) { - return new StreamPlanEnvironment(env); + currentEnvironment = new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } + return currentEnvironment; } private static StreamExecutionEnvironment createContextEnvironment(Client client, @@ -502,9 +508,9 @@ public abstract class StreamExecutionEnvironment { * parallelism. */ public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) { - LocalStreamEnvironment lee = new LocalStreamEnvironment(); - lee.setDegreeOfParallelism(degreeOfParallelism); - return lee; + currentEnvironment = new LocalStreamEnvironment(); + currentEnvironment.setDegreeOfParallelism(degreeOfParallelism); + return (LocalStreamEnvironment) currentEnvironment; } // TODO:fix cluster default parallelism @@ -530,7 +536,8 @@ public abstract class StreamExecutionEnvironment { */ public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles) { - return new RemoteStreamEnvironment(host, port, jarFiles); + currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles); + return currentEnvironment; } /** @@ -556,9 +563,9 @@ public abstract class StreamExecutionEnvironment { */ public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int degreeOfParallelism, String... jarFiles) { - RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles); - rec.setDegreeOfParallelism(degreeOfParallelism); - return rec; + currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles); + currentEnvironment.setDegreeOfParallelism(degreeOfParallelism); + return currentEnvironment; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index 8264b7f..1091aa3 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -44,9 +44,7 @@ object TopSpeedWindowing { return } - val env = StreamExecutionEnvironment.getExecutionEnvironment - val cars = env.fromCollection(genCarStream()) - .groupBy("carId") + val cars = genCarStream().groupBy("carId") .window(Time.of(evictionSec, SECONDS)) .every(Delta.of[CarEvent](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0))) @@ -54,15 +52,15 @@ object TopSpeedWindowing { cars print - env.execute("TopSpeedWindowing") + StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") } - def genCarStream(): Stream[CarEvent] = { + def genCarStream(): DataStream[CarEvent] = { def nextSpeed(carEvent : CarEvent) : CarEvent = { - val next = + val next = if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) } http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala index 2af370b..222eb6d 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -31,7 +31,7 @@ package object scala { // We have this here so that we always have generated TypeInformationS when // using the Scala API implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] - + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = new DataStream[R](javaStream) @@ -41,10 +41,14 @@ package object scala { implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = new SplitDataStream[R](javaStream) - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) - private[flink] def fieldNames2Indices( + implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] = + StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq) + + + private[flink] def fieldNames2Indices( typeInfo: TypeInformation[_], fields: Array[String]): Array[Int] = { typeInfo match {
