Repository: flink
Updated Branches:
  refs/heads/master dea417260 -> 91eea376e


[FLINK-8571] [DataStream] Introduce utility function that reinterprets a data 
stream as keyed stream

This closes #5424.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91eea376
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91eea376
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91eea376

Branch: refs/heads/master
Commit: 91eea376eee1ca2612b0f0603511b5eb52f11195
Parents: bfe6f84
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Feb 2 18:39:51 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Feb 9 18:23:37 2018 +0100

----------------------------------------------------------------------
 docs/dev/api_concepts.md                        |   1 -
 docs/dev/stream/experimental.md                 |  79 +++++++
 .../api/datastream/DataStreamUtils.java         |  62 ++++-
 .../streaming/api/datastream/KeyedStream.java   |  33 ++-
 ...einterpretDataStreamAsKeyedStreamITCase.java | 231 +++++++++++++++++++
 .../streaming/api/scala/DataStreamUtils.scala   |  25 ++
 .../streaming/api/scala/DataStreamTest.scala    |   4 +-
 ...interpretDataStreamAsKeyedStreamITCase.scala |  40 ++++
 8 files changed, 468 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/docs/dev/api_concepts.md
----------------------------------------------------------------------
diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index c675225..b336fce 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -895,4 +895,3 @@ result type ```R``` for the final result. E.g. for a 
histogram, ```V``` is a num
  a histogram. ```SimpleAccumulator``` is for the cases where both types are 
the same, e.g. for counters.
 
 {% top %}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/docs/dev/stream/experimental.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/experimental.md b/docs/dev/stream/experimental.md
new file mode 100644
index 0000000..db029be
--- /dev/null
+++ b/docs/dev/stream/experimental.md
@@ -0,0 +1,79 @@
+---
+title: "Experimental Features"
+nav-id: experimental_features
+nav-show_overview: true
+nav-parent_id: streaming
+nav-pos: 100
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This section describes experimental features in the DataStream API. 
Experimental features are still evolving and can be either unstable,
+incomplete, or subject to heavy change in future versions.
+
+Reinterpreting a pre-partitioned data stream as keyed stream
+------------------------------------------------------------
+
+We can re-interpret a pre-partitioned data stream as a keyed stream to avoid 
shuffling.
+
+**WARNING**: The re-interpreted data stream **MUST** already be 
pre-partitioned in **EXACTLY** the same way Flink's keyBy would partition
+the data in a shuffle w.r.t. key-group assignment.
+
+One use-case for this could be a materialized shuffle between two jobs: the 
first job performs a keyBy shuffle and materializes
+each output into a partition. A second job has sources that, for each parallel 
instance, reads from the corresponding partitions
+created by the first job. Those sources can now be re-interpreted as keyed 
streams, e.g. to apply windowing. Notice that this trick
+makes the second job embarrassingly parallel, which can be helpful for a 
fine-grained recovery scheme.
+
+This re-interpretation functionality is exposed through `DataStreamUtils`:
+
+{% highlight java %}
+       static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
+               DataStream<T> stream,
+               KeySelector<T, K> keySelector,
+               TypeInformation<K> typeInfo)
+{% endhighlight %}
+
+Given a base stream, a key selector, and type information,
+the method creates a keyed stream from the base stream.
+
+Code example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = ...
+        DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, 
TypeInformation.of(Integer.class))
+            .timeWindow(Time.seconds(1))
+            .reduce((a, b) -> a + b)
+            .addSink(new DiscardingSink<>());
+        env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val source = ...
+    new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
+      .timeWindow(Time.seconds(1))
+      .reduce((a, b) => a + b)
+      .addSink(new DiscardingSink[Int])
+    env.execute()
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index d145d6f..351c456 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -18,13 +18,18 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.experimental.CollectSink;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -33,7 +38,7 @@ import java.net.UnknownHostException;
 import java.util.Iterator;
 
 /**
- * A collection of experimental utilities for {@link DataStream DataStreams}.
+ * A collection of utilities for {@link DataStream DataStreams}.
  *
  * <p>This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
@@ -85,6 +90,61 @@ public final class DataStreamUtils {
                return iter;
        }
 
+       /**
+        * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, 
which extracts keys with the given
+        * {@link KeySelector}.
+        *
+        * <p>IMPORTANT: For every partition of the base stream, the keys of 
events in the base stream must be
+        * partitioned exactly in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        *
+        * @param stream      The data stream to reinterpret. For every 
partition, this stream must be partitioned exactly
+        *                    in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        * @param keySelector Function that defines how keys are extracted from 
the data stream.
+        * @param <T>         Type of events in the data stream.
+        * @param <K>         Type of the extracted keys.
+        * @return The reinterpretation of the {@link DataStream} as a {@link 
KeyedStream}.
+        */
+       public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
+               DataStream<T> stream,
+               KeySelector<T, K> keySelector) {
+
+               return reinterpretAsKeyedStream(
+                       stream,
+                       keySelector,
+                       TypeExtractor.getKeySelectorTypes(keySelector, 
stream.getType()));
+       }
+
+       /**
+        * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, 
which extracts keys with the given
+        * {@link KeySelector}.
+        *
+        * <p>IMPORTANT: For every partition of the base stream, the keys of 
events in the base stream must be
+        * partitioned exactly in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        *
+        * @param stream      The data stream to reinterpret. For every 
partition, this stream must be partitioned exactly
+        *                    in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        * @param keySelector Function that defines how keys are extracted from 
the data stream.
+        * @param typeInfo    Explicit type information about the key type.
+        * @param <T>         Type of events in the data stream.
+        * @param <K>         Type of the extracted keys.
+        * @return The reinterpretation of the {@link DataStream} as a {@link 
KeyedStream}.
+        */
+       public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
+               DataStream<T> stream,
+               KeySelector<T, K> keySelector,
+               TypeInformation<K> typeInfo) {
+
+               PartitionTransformation<T> partitionTransformation = new 
PartitionTransformation<>(
+                       stream.getTransformation(),
+                       new ForwardPartitioner<>());
+
+               return new KeyedStream<>(
+                       stream,
+                       partitionTransformation,
+                       keySelector,
+                       typeInfo);
+       }
+
        private static class CallExecute extends Thread {
 
                private final StreamExecutionEnvironment toTrigger;

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index ebcd7d5..7beaa03 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -120,12 +120,37 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            Function for determining state partitions
         */
        public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector, TypeInformation<KEY> keyType) {
-               super(
-                       dataStream.getExecutionEnvironment(),
+               this(
+                       dataStream,
                        new PartitionTransformation<>(
                                dataStream.getTransformation(),
-                               new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
-               this.keySelector = keySelector;
+                               new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
+                       keySelector,
+                       keyType);
+       }
+
+       /**
+        * Creates a new {@link KeyedStream} using the given {@link 
KeySelector} and {@link TypeInformation}
+        * to partition operator state by key, where the partitioning is 
defined by a {@link PartitionTransformation}.
+        *
+        * @param stream
+        *            Base stream of data
+        * @param partitionTransformation
+        *            Function that determines how the keys are distributed to 
downstream operator(s)
+        * @param keySelector
+        *            Function to extract keys from the base stream
+        * @param keyType
+        *            Defines the type of the extracted keys
+        */
+       @Internal
+       KeyedStream(
+               DataStream<T> stream,
+               PartitionTransformation<T> partitionTransformation,
+               KeySelector<T, KEY> keySelector,
+               TypeInformation<KEY> keyType) {
+
+               super(stream.getExecutionEnvironment(), 
partitionTransformation);
+               this.keySelector = clean(keySelector);
                this.keyType = validateKeyType(keyType);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
new file mode 100644
index 0000000..fc8e997
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Integration test for {@link 
DataStreamUtils#reinterpretAsKeyedStream(DataStream, KeySelector, 
TypeInformation)}.
+ */
+public class ReinterpretDataStreamAsKeyedStreamITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * This test checks that reinterpreting a data stream to a keyed stream 
works as expected. This test consists of
+        * two jobs. The first job materializes a keyBy into files, one files 
per partition. The second job opens the
+        * files created by the first jobs as sources (doing the correct 
assignment of files to partitions) and
+        * reinterprets the sources as keyed, because we know they have been 
partitioned in a keyBy from the first job.
+        */
+       @Test
+       public void testReinterpretAsKeyedStream() throws Exception {
+
+               final int numEventsPerInstance = 100;
+               final int maxParallelism = 8;
+               final int parallelism = 3;
+               final int numUniqueKeys = 12;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.setMaxParallelism(maxParallelism);
+               env.setParallelism(parallelism);
+
+               final List<File> partitionFiles = new ArrayList<>(parallelism);
+               for (int i = 0; i < parallelism; ++i) {
+                       File partitionFile = temporaryFolder.newFile();
+                       partitionFiles.add(i, partitionFile);
+               }
+
+               env.addSource(new RandomTupleSource(numEventsPerInstance, 
numUniqueKeys))
+                       .keyBy(0)
+                       .addSink(new ToPartitionFileSink(partitionFiles));
+
+               env.execute();
+
+               DataStreamUtils.reinterpretAsKeyedStream(
+                       env.addSource(new 
FromPartitionFileSource(partitionFiles)),
+                       (KeySelector<Tuple2<Integer, Integer>, Integer>) value 
-> value.f0,
+                       TypeInformation.of(Integer.class))
+                       .timeWindow(Time.seconds(1)) // test that also timers 
and aggregated state work as expected
+                       .reduce((ReduceFunction<Tuple2<Integer, Integer>>) 
(value1, value2) ->
+                               new Tuple2<>(value1.f0, value1.f1 + value2.f1))
+                       .addSink(new ValidatingSink(numEventsPerInstance * 
parallelism)).setParallelism(1);
+
+               env.execute();
+       }
+
+       private static class RandomTupleSource implements 
ParallelSourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private int numKeys;
+               private int remainingEvents;
+
+               public RandomTupleSource(int numEvents, int numKeys) {
+                       this.numKeys = numKeys;
+                       this.remainingEvents = numEvents;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> out) 
throws Exception {
+                       Random random = new Random(42);
+                       while (--remainingEvents >= 0) {
+                               out.collect(new 
Tuple2<>(random.nextInt(numKeys), 1));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       this.remainingEvents = 0;
+               }
+       }
+
+       private static class ToPartitionFileSink extends 
RichSinkFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final List<File> allPartitions;
+               private DataOutputStream dos;
+
+               public ToPartitionFileSink(List<File> allPartitions) {
+                       this.allPartitions = allPartitions;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+                       dos = new DataOutputStream(
+                               new BufferedOutputStream(
+                                       new 
FileOutputStream(allPartitions.get(subtaskIdx))));
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       dos.close();
+               }
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> value, Context 
context) throws Exception {
+                       dos.writeInt(value.f0);
+                       dos.writeInt(value.f1);
+               }
+       }
+
+       private static class FromPartitionFileSource extends 
RichParallelSourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private List<File> allPartitions;
+               private DataInputStream din;
+               private volatile boolean running;
+
+               public FromPartitionFileSource(List<File> allPartitons) {
+                       this.allPartitions = allPartitons;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+                       din = new DataInputStream(
+                               new BufferedInputStream(
+                                       new 
FileInputStream(allPartitions.get(subtaskIdx))));
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       din.close();
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> out) 
throws Exception {
+                       this.running = true;
+                       try {
+                               while (running) {
+                                       Integer key = din.readInt();
+                                       Integer val = din.readInt();
+                                       out.collect(new Tuple2<>(key, val));
+                               }
+                       } catch (EOFException ignore) {
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       this.running = false;
+               }
+       }
+
+       private static class ValidatingSink extends 
RichSinkFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+               private final int expectedSum;
+               private int runningSum = 0;
+
+               private ValidatingSink(int expectedSum) {
+                       this.expectedSum = expectedSum;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       
Preconditions.checkState(getRuntimeContext().getNumberOfParallelSubtasks() == 
1);
+               }
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> value, Context 
context) throws Exception {
+                       runningSum += value.f1;
+               }
+
+               @Override
+               public void close() throws Exception {
+                       Assert.assertEquals(expectedSum, runningSum);
+                       super.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
index 74dd66a..0f42aa2 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -43,5 +43,30 @@ class DataStreamUtils[T: TypeInformation : ClassTag](val 
self: DataStream[T]) {
   def collect() : Iterator[T] = {
     JavaStreamUtils.collect(self.javaStream).asScala
   }
+
+  /**
+    * Reinterprets the given [[DataStream]] as a [[KeyedStream]], which 
extracts keys with the
+    * given [[KeySelectorWithType]].
+    *
+    * IMPORTANT: For every partition of the base stream, the keys of events in 
the base stream
+    * must be partitioned exactly in the same way as if it was created through 
a
+    * [[DataStream#keyBy(KeySelectorWithType)]].
+    *
+    * @param keySelector Function that defines how keys are extracted from the 
data stream.
+    * @return The reinterpretation of the [[DataStream]] as a [[KeyedStream]].
+    */
+  def reinterpretAsKeyedStream[K: TypeInformation](
+        keySelector: T => K): KeyedStream[T, K] = {
+
+    val keySelectorWithType =
+      new KeySelectorWithType[T, K](clean(keySelector), 
implicitly[TypeInformation[K]])
+
+    asScalaStream(
+      JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream, 
keySelectorWithType))
+  }
+
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new 
StreamExecutionEnvironment(self.javaStream.getExecutionEnvironment).scalaClean(f)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 805eb41..e2c5b41 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -25,17 +25,19 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
KeyedProcessOperator, ProcessOperator, StreamOperator}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, 
PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
-import org.junit.{Rule, Test}
 import org.junit.rules.ExpectedException
+import org.junit.{Rule, Test}
 
 class DataStreamTest extends AbstractTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91eea376/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
new file mode 100644
index 0000000..0957424
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+/**
+  * Integration test for [[DataStreamUtils.reinterpretAsKeyedStream()]].
+  */
+class ReinterpretDataStreamAsKeyedStreamITCase {
+
+  @Test
+  def testReinterpretAsKeyedStream(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val source = env.fromElements(1, 2, 3)
+    new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
+      .timeWindow(Time.seconds(1))
+      .reduce((a, b) => a + b)
+      .addSink(new DiscardingSink[Int])
+    env.execute()
+  }
+}

Reply via email to