Repository: flink
Updated Branches:
  refs/heads/release-1.4 33ebc85c2 -> beff62d2e


[FLINK-8571] [DataStream] Introduce utility function that reinterprets a data 
stream as keyed stream (backport from 1.5 branch)

This closes #5439.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/beff62d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/beff62d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/beff62d2

Branch: refs/heads/release-1.4
Commit: beff62d2e6e41b9406ac02ec8b206c80b9df50ed
Parents: 33ebc85
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Feb 9 11:30:37 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Feb 9 17:36:20 2018 +0100

----------------------------------------------------------------------
 .../contrib/streaming/DataStreamUtils.java      |  61 +++++
 .../contrib/streaming/scala/utils/package.scala |  25 +-
 .../ReinterpretAsKeyedStreamITCase.java         | 233 +++++++++++++++++++
 ...interpretDataStreamAsKeyedStreamITCase.scala |  42 ++++
 .../streaming/api/datastream/KeyedStream.java   |  33 ++-
 5 files changed, 389 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/beff62d2/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
index 430c98c..6d0055c 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -17,13 +17,19 @@
 
 package org.apache.flink.contrib.streaming;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -80,6 +86,61 @@ public final class DataStreamUtils {
                return iter;
        }
 
+       /**
+        * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, 
which extracts keys with the given
+        * {@link KeySelector}.
+        *
+        * <p>IMPORTANT: For every partition of the base stream, the keys of 
events in the base stream must be
+        * partitioned exactly in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        *
+        * @param stream      The data stream to reinterpret. For every 
partition, this stream must be partitioned exactly
+        *                    in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        * @param keySelector Function that defines how keys are extracted from 
the data stream.
+        * @param <T>         Type of events in the data stream.
+        * @param <K>         Type of the extracted keys.
+        * @return The reinterpretation of the {@link DataStream} as a {@link 
KeyedStream}.
+        */
+       public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
+               DataStream<T> stream,
+               KeySelector<T, K> keySelector) {
+
+               return reinterpretAsKeyedStream(
+                       stream,
+                       keySelector,
+                       TypeExtractor.getKeySelectorTypes(keySelector, 
stream.getType()));
+       }
+
+       /**
+        * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, 
which extracts keys with the given
+        * {@link KeySelector}.
+        *
+        * <p>IMPORTANT: For every partition of the base stream, the keys of 
events in the base stream must be
+        * partitioned exactly in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        *
+        * @param stream      The data stream to reinterpret. For every 
partition, this stream must be partitioned exactly
+        *                    in the same way as if it was created through a 
{@link DataStream#keyBy(KeySelector)}.
+        * @param keySelector Function that defines how keys are extracted from 
the data stream.
+        * @param typeInfo    Explicit type information about the key type.
+        * @param <T>         Type of events in the data stream.
+        * @param <K>         Type of the extracted keys.
+        * @return The reinterpretation of the {@link DataStream} as a {@link 
KeyedStream}.
+        */
+       public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
+               DataStream<T> stream,
+               KeySelector<T, K> keySelector,
+               TypeInformation<K> typeInfo) {
+
+               PartitionTransformation<T> partitionTransformation = new 
PartitionTransformation<>(
+                       stream.getTransformation(),
+                       new ForwardPartitioner<>());
+
+               return new KeyedStream<>(
+                       stream,
+                       partitionTransformation,
+                       keySelector,
+                       typeInfo);
+       }
+
        private static class CallExecute extends Thread {
 
                private final StreamExecutionEnvironment toTrigger;

http://git-wip-us.apache.org/repos/asf/flink/blob/beff62d2/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
 
b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
index 86a2bdc..a2dcb18 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala
@@ -43,6 +43,29 @@ package object utils {
       JavaStreamUtils.collect(self.javaStream).asScala
     }
 
-  }
+    /**
+      * Reinterprets the given [[DataStream]] as a [[KeyedStream]], which 
extracts keys with the
+      * given [[KeySelectorWithType]].
+      *
+      * IMPORTANT: For every partition of the base stream, the keys of events 
in the base stream
+      * must be partitioned exactly in the same way as if it was created 
through a
+      * [[DataStream#keyBy(KeySelectorWithType)]].
+      *
+      * @param keySelector Function that defines how keys are extracted from 
the data stream.
+      * @return The reinterpretation of the [[DataStream]] as a 
[[KeyedStream]].
+      */
+    def reinterpretAsKeyedStream[K: TypeInformation](
+      keySelector: T => K): KeyedStream[T, K] = {
+
+      val keySelectorWithType =
+        new KeySelectorWithType[T, K](clean(keySelector), 
implicitly[TypeInformation[K]])
 
+      asScalaStream(
+        JavaStreamUtils.reinterpretAsKeyedStream(self.javaStream, 
keySelectorWithType))
+    }
+
+    private[flink] def clean[F <: AnyRef](f: F): F = {
+      new 
StreamExecutionEnvironment(self.javaStream.getExecutionEnvironment).scalaClean(f)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/beff62d2/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
new file mode 100644
index 0000000..e068ff1
--- /dev/null
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, 
KeySelector)}.
+ */
+public class ReinterpretAsKeyedStreamITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * This test checks that reinterpreting a data stream to a keyed stream 
works as expected. This test consists of
+        * two jobs. The first job materializes a keyBy into files, one files 
per partition. The second job opens the
+        * files created by the first jobs as sources (doing the correct 
assignment of files to partitions) and
+        * reinterprets the sources as keyed, because we know they have been 
partitioned in a keyBy from the first job.
+        */
+       @Test
+       public void testReinterpretAsKeyedStream() throws Exception {
+
+               final int numEventsPerInstance = 100;
+               final int maxParallelism = 8;
+               final int parallelism = 3;
+               final int numUniqueKeys = 12;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.setMaxParallelism(maxParallelism);
+               env.setParallelism(parallelism);
+
+               final List<File> partitionFiles = new ArrayList<>(parallelism);
+               for (int i = 0; i < parallelism; ++i) {
+                       File partitionFile = temporaryFolder.newFile();
+                       partitionFiles.add(i, partitionFile);
+               }
+
+               env.addSource(new RandomTupleSource(numEventsPerInstance, 
numUniqueKeys))
+                       .keyBy(0)
+                       .addSink(new ToPartitionFileSink(partitionFiles));
+
+               env.execute();
+
+               DataStreamUtils.reinterpretAsKeyedStream(
+                       env.addSource(new 
FromPartitionFileSource(partitionFiles)),
+                       (KeySelector<Tuple2<Integer, Integer>, Integer>) value 
-> value.f0,
+                       TypeInformation.of(Integer.class))
+                       .timeWindow(Time.seconds(1)) // test that also timers 
and aggregated state work as expected
+                       .reduce((ReduceFunction<Tuple2<Integer, Integer>>) 
(value1, value2) ->
+                               new Tuple2<>(value1.f0, value1.f1 + value2.f1))
+                       .addSink(new ValidatingSink(numEventsPerInstance * 
parallelism)).setParallelism(1);
+
+               env.execute();
+       }
+
+       private static class RandomTupleSource implements 
ParallelSourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private int numKeys;
+               private int remainingEvents;
+
+               public RandomTupleSource(int numEvents, int numKeys) {
+                       this.numKeys = numKeys;
+                       this.remainingEvents = numEvents;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> out) 
throws Exception {
+                       Random random = new Random(42);
+                       while (--remainingEvents >= 0) {
+                               out.collect(new 
Tuple2<>(random.nextInt(numKeys), 1));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       this.remainingEvents = 0;
+               }
+       }
+
+       private static class ToPartitionFileSink extends 
RichSinkFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final List<File> allPartitions;
+               private DataOutputStream dos;
+
+               public ToPartitionFileSink(List<File> allPartitions) {
+                       this.allPartitions = allPartitions;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+                       dos = new DataOutputStream(
+                               new BufferedOutputStream(
+                                       new 
FileOutputStream(allPartitions.get(subtaskIdx))));
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       dos.close();
+               }
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> value, Context 
context) throws Exception {
+                       dos.writeInt(value.f0);
+                       dos.writeInt(value.f1);
+               }
+       }
+
+       private static class FromPartitionFileSource extends 
RichParallelSourceFunction<Tuple2<Integer, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               private List<File> allPartitions;
+               private DataInputStream din;
+               private volatile boolean running;
+
+               public FromPartitionFileSource(List<File> allPartitons) {
+                       this.allPartitions = allPartitons;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+                       din = new DataInputStream(
+                               new BufferedInputStream(
+                                       new 
FileInputStream(allPartitions.get(subtaskIdx))));
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       din.close();
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> out) 
throws Exception {
+                       this.running = true;
+                       try {
+                               while (running) {
+                                       Integer key = din.readInt();
+                                       Integer val = din.readInt();
+                                       out.collect(new Tuple2<>(key, val));
+                               }
+                       } catch (EOFException ignore) {
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       this.running = false;
+               }
+       }
+
+       private static class ValidatingSink extends 
RichSinkFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+               private final int expectedSum;
+               private int runningSum = 0;
+
+               private ValidatingSink(int expectedSum) {
+                       this.expectedSum = expectedSum;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       
Preconditions.checkState(getRuntimeContext().getNumberOfParallelSubtasks() == 
1);
+               }
+
+               @Override
+               public void invoke(Tuple2<Integer, Integer> value, Context 
context) throws Exception {
+                       runningSum += value.f1;
+               }
+
+               @Override
+               public void close() throws Exception {
+                       Assert.assertEquals(expectedSum, runningSum);
+                       super.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/beff62d2/flink-contrib/flink-streaming-contrib/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
 
b/flink-contrib/flink-streaming-contrib/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
new file mode 100644
index 0000000..2a36f1a
--- /dev/null
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/scala/org/apache/flink/streaming/api/scala/ReinterpretDataStreamAsKeyedStreamITCase.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.contrib.streaming.scala.utils.DataStreamUtils
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+
+/**
+  * Integration test for [[DataStreamUtils.reinterpretAsKeyedStream()]].
+  */
+class ReinterpretDataStreamAsKeyedStreamITCase {
+
+  @Test
+  def testReinterpretAsKeyedStream(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val source = env.fromElements(1, 2, 3)
+    new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
+      .timeWindow(Time.seconds(1))
+      .reduce((a, b) => a + b)
+      .addSink(new DiscardingSink[Int])
+    env.execute()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/beff62d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index ebcd7d5..94c6996 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -120,12 +120,37 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            Function for determining state partitions
         */
        public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector, TypeInformation<KEY> keyType) {
-               super(
-                       dataStream.getExecutionEnvironment(),
+               this(
+                       dataStream,
                        new PartitionTransformation<>(
                                dataStream.getTransformation(),
-                               new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
-               this.keySelector = keySelector;
+                               new KeyGroupStreamPartitioner<>(keySelector, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
+                       keySelector,
+                       keyType);
+       }
+
+       /**
+        * Creates a new {@link KeyedStream} using the given {@link 
KeySelector} and {@link TypeInformation}
+        * to partition operator state by key, where the partitioning is 
defined by a {@link PartitionTransformation}.
+        *
+        * @param stream
+        *            Base stream of data
+        * @param partitionTransformation
+        *            Function that determines how the keys are distributed to 
downstream operator(s)
+        * @param keySelector
+        *            Function to extract keys from the base stream
+        * @param keyType
+        *            Defines the type of the extracted keys
+        */
+       @Internal
+       public KeyedStream(
+               DataStream<T> stream,
+               PartitionTransformation<T> partitionTransformation,
+               KeySelector<T, KEY> keySelector,
+               TypeInformation<KEY> keyType) {
+
+               super(stream.getExecutionEnvironment(), 
partitionTransformation);
+               this.keySelector = clean(keySelector);
                this.keyType = validateKeyType(keyType);
        }
 

Reply via email to