Repository: flink
Updated Branches:
  refs/heads/master 06c2c35a2 -> c28d96cd6


[FLINK-1344] [streaming] [scala] Added implicits from scala seq to datastream 
and static StreamExecutionEnvironment initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad45bf12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad45bf12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad45bf12

Branch: refs/heads/master
Commit: ad45bf1248ff4634490739d0ec7740e579ab9795
Parents: 06c2c35
Author: Paris Carbone <[email protected]>
Authored: Fri Jan 23 17:23:46 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Wed Jan 28 11:25:40 2015 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 25 +++++++++++++-------
 .../examples/windowing/TopSpeedWindowing.scala  | 10 ++++----
 .../flink/streaming/api/scala/package.scala     | 10 +++++---
 3 files changed, 27 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 352e0fd..2c10017 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -66,6 +66,8 @@ public abstract class StreamExecutionEnvironment {
 
        private ExecutionConfig config = new ExecutionConfig();
 
+       private static StreamExecutionEnvironment currentEnvironment;
+
        protected StreamGraph streamGraph;
 
        // 
--------------------------------------------------------------------------------------------
@@ -459,16 +461,20 @@ public abstract class StreamExecutionEnvironment {
         *         executed.
         */
        public static StreamExecutionEnvironment getExecutionEnvironment() {
+               if (currentEnvironment != null) {
+                       return currentEnvironment;
+               }
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                if (env instanceof ContextEnvironment) {
                        ContextEnvironment ctx = (ContextEnvironment) env;
-                       return createContextEnvironment(ctx.getClient(), 
ctx.getJars(),
+                       currentEnvironment = 
createContextEnvironment(ctx.getClient(), ctx.getJars(),
                                        ctx.getDegreeOfParallelism());
                } else if (env instanceof OptimizerPlanEnvironment | env 
instanceof PreviewPlanEnvironment) {
-                       return new StreamPlanEnvironment(env);
+                       currentEnvironment = new StreamPlanEnvironment(env);
                } else {
                        return createLocalEnvironment();
                }
+               return currentEnvironment;
        }
 
        private static StreamExecutionEnvironment 
createContextEnvironment(Client client,
@@ -502,9 +508,9 @@ public abstract class StreamExecutionEnvironment {
         *         parallelism.
         */
        public static LocalStreamEnvironment createLocalEnvironment(int 
degreeOfParallelism) {
-               LocalStreamEnvironment lee = new LocalStreamEnvironment();
-               lee.setDegreeOfParallelism(degreeOfParallelism);
-               return lee;
+               currentEnvironment = new LocalStreamEnvironment();
+               currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+               return (LocalStreamEnvironment) currentEnvironment;
        }
 
        // TODO:fix cluster default parallelism
@@ -530,7 +536,8 @@ public abstract class StreamExecutionEnvironment {
         */
        public static StreamExecutionEnvironment createRemoteEnvironment(String 
host, int port,
                        String... jarFiles) {
-               return new RemoteStreamEnvironment(host, port, jarFiles);
+               currentEnvironment = new RemoteStreamEnvironment(host, port, 
jarFiles);
+               return currentEnvironment;
        }
 
        /**
@@ -556,9 +563,9 @@ public abstract class StreamExecutionEnvironment {
         */
        public static StreamExecutionEnvironment createRemoteEnvironment(String 
host, int port,
                        int degreeOfParallelism, String... jarFiles) {
-               RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, 
port, jarFiles);
-               rec.setDegreeOfParallelism(degreeOfParallelism);
-               return rec;
+               currentEnvironment = new RemoteStreamEnvironment(host, port, 
jarFiles);
+               currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+               return currentEnvironment;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 8264b7f..1091aa3 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -44,9 +44,7 @@ object TopSpeedWindowing {
       return
     }
 
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val cars = env.fromCollection(genCarStream())
-      .groupBy("carId")
+    val cars = genCarStream().groupBy("carId")
       .window(Time.of(evictionSec, SECONDS))
       .every(Delta.of[CarEvent](triggerMeters,
           (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
@@ -54,15 +52,15 @@ object TopSpeedWindowing {
 
     cars print
 
-    env.execute("TopSpeedWindowing")
+    
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
 
   }
 
-  def genCarStream(): Stream[CarEvent] = {
+  def genCarStream(): DataStream[CarEvent] = {
 
     def nextSpeed(carEvent : CarEvent) : CarEvent =
     {
-      val next = 
+      val next =
         if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
       CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad45bf12/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 2af370b..222eb6d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -31,7 +31,7 @@ package object scala {
   // We have this here so that we always have generated TypeInformationS when
   // using the Scala API
   implicit def createTypeInformation[T]: TypeInformation[T] = macro 
TypeUtils.createTypeInfo[T]
-  
+
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
 
@@ -41,10 +41,14 @@ package object scala {
   implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): 
SplitDataStream[R] =
     new SplitDataStream[R](javaStream)
 
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: 
JavaConStream[IN1, IN2]): 
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: 
JavaConStream[IN1, IN2]):
   ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
 
-   private[flink] def fieldNames2Indices(
+  implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: 
Seq[T]) : DataStream[T] =
+    StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
+
+
+  private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],
       fields: Array[String]): Array[Int] = {
     typeInfo match {

Reply via email to