http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala
new file mode 100644
index 0000000..8d98c46
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream}
+
+/**
+  * Wraps a data stream, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param stream The wrapped data stream
+  * @tparam T The type of the data stream items
+  */
+class OnDataStream[T](stream: DataStream[T]) {
+
+  /**
+    * Applies a function `fun` to each item of the stream
+    *
+    * @param fun The function to be applied to each item
+    * @tparam R The type of the items in the returned stream
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def mapWith[R: TypeInformation](fun: T => R): DataStream[R] =
+    stream.map(fun)
+
+  /**
+    * Applies a function `fun` to each item of the stream, producing a 
collection of items
+    * that will be flattened in the resulting stream
+    *
+    * @param fun The function to be applied to each item
+    * @tparam R The type of the items in the returned stream
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def flatMapWith[R: TypeInformation](fun: T => TraversableOnce[R]): 
DataStream[R] =
+    stream.flatMap(fun)
+
+  /**
+    * Applies a predicate `fun` to each item of the stream, keeping only those 
for which
+    * the predicate holds
+    *
+    * @param fun The predicate to be tested on each item
+    * @return A dataset of R
+    */
+  @PublicEvolving
+  def filterWith(fun: T => Boolean): DataStream[T] =
+    stream.filter(fun)
+
+  /**
+    * Keys the items according to a keying function `fun`
+    *
+    * @param fun The keying function
+    * @tparam K The type of the key, for which type information must be known
+    * @return A stream of Ts keyed by Ks
+    */
+  @PublicEvolving
+  def keyingBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] =
+    stream.keyBy(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala
new file mode 100644
index 0000000..226eb8b
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{DataStream, JoinedStreams}
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Wraps a joined data stream, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param stream The wrapped data stream
+  * @tparam L The type of the data stream items from the left side of the join
+  * @tparam R The type of the data stream items from the right input of the 
join
+  * @tparam K The type of key
+  * @tparam W The type of the window
+  */
+class OnJoinedStream[L, R, K, W <: Window](
+    stream: JoinedStreams[L, R]#Where[K]#EqualTo#WithWindow[W]) {
+
+  /**
+    * Completes the join operation with the user function that is executed
+    * for windowed groups.
+    *
+    * @param fun The function that defines the projection of the join
+    * @tparam O The return type of the projection, for which type information 
must be known
+    * @return A fully joined data set of Os
+    */
+  @PublicEvolving
+  def projecting[O: TypeInformation](fun: (L, R) => O): DataStream[O] =
+    stream.apply(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala
new file mode 100644
index 0000000..218bcbf
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream}
+
+/**
+  * Wraps a keyed data stream, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param stream The wrapped data stream
+  * @tparam T The type of the data stream items
+  * @tparam K The type of key
+  */
+class OnKeyedStream[T, K](stream: KeyedStream[T, K]) {
+
+  /**
+    * Applies a reducer `fun` to the stream
+    *
+    * @param fun The reducing function to be applied on the keyed stream
+    * @return A data set of Ts
+    */
+  @PublicEvolving
+  def reduceWith(fun: (T, T) => T): DataStream[T] =
+    stream.reduce(fun)
+
+  /**
+    * Folds the stream over a zero element with a reducer `fun`
+    *
+    * @param initialValue The zero element
+    * @param fun The reducing function to be applied on the keyed stream
+    * @return A data set of Rs
+    */
+  @PublicEvolving
+  def foldWith[R: TypeInformation](initialValue: R)(fun: (R, T) => R): 
DataStream[R] =
+    stream.fold(initialValue)(fun)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
new file mode 100644
index 0000000..f7a5923
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Wraps a joined data stream, allowing to use anonymous partial functions to
+  * perform extraction of items in a tuple, case class instance or collection
+  *
+  * @param stream The wrapped data stream
+  * @tparam T The type of the data stream items from the right input of the 
join
+  * @tparam K The type of key
+  * @tparam W The type of the window
+  */
+class OnWindowedStream[T, K, W <: Window](stream: WindowedStream[T, K, W]) {
+
+  /**
+    * Applies a reduce function to the window. The window function is called 
for each evaluation
+    * of the window for each key individually. The output of the reduce 
function is interpreted
+    * as a regular non-windowed stream.
+    *
+    * This window will try and pre-aggregate data as much as the window 
policies permit.
+    * For example,tumbling time windows can perfectly pre-aggregate the data, 
meaning that only one
+    * element per key is stored. Sliding time windows will pre-aggregate on 
the granularity of the
+    * slide interval, so a few elements are stored per key (one per slide 
interval).
+    * Custom windows may not be able to pre-aggregate, or may need to store 
extra values in an
+    * aggregation tree.
+    *
+    * @param function The reduce function.
+    * @return The data stream that is the result of applying the reduce 
function to the window.
+    */
+  @PublicEvolving
+  def reduceWith(function: (T, T) => T) =
+    stream.reduce(function)
+
+  /**
+    * Applies the given fold function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
reduce function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * @param function The fold function.
+    * @return The data stream that is the result of applying the fold function 
to the window.
+    */
+  @PublicEvolving
+  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) =
+    stream.fold(initialValue)(function)
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is incrementally aggregated using the given fold function.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental 
aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  @PublicEvolving
+  def applyWith[R: TypeInformation](
+      initialValue: R)(
+      foldFunction: (R, T) => R,
+      windowFunction: (K, W, Stream[R]) => TraversableOnce[R])
+    : DataStream[R] =
+    stream.apply(initialValue, foldFunction, {
+      (key, window, items, out) =>
+        windowFunction(key, window, items.toStream).foreach(out.collect)
+    })
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala
new file mode 100644
index 0000000..f82bb74
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions._
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * acceptPartialFunctions extends the original DataStream with methods with 
unique names
+  * that delegate to core higher-order functions (e.g. `map`) so that we can 
work around
+  * the fact that overloaded methods taking functions as parameters can't 
accept partial
+  * functions as well. This enables the possibility to directly apply pattern 
matching
+  * to decompose inputs such as tuples, case classes and collections.
+  *
+  * The following is a small example that showcases how this extensions would 
work on
+  * a Flink data stream:
+  *
+  * {{{
+  *   object Main {
+  *     import org.apache.flink.streaming.api.scala.extensions._
+  *     case class Point(x: Double, y: Double)
+  *     def main(args: Array[String]): Unit = {
+  *       val env = StreamExecutionEnvironment.getExecutionEnvironment
+  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+  *       ds.filterWith {
+  *         case Point(x, _) => x > 1
+  *       }.reduceWith {
+  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
+  *       }.mapWith {
+  *         case Point(x, y) => (x, y)
+  *       }.flatMapWith {
+  *         case (x, y) => Seq('x' -> x, 'y' -> y)
+  *       }.keyingBy {
+  *         case (id, value) => id
+  *       }
+  *     }
+  *   }
+  * }}}
+  *
+  * The extension consists of several implicit conversions over all the data 
stream representations
+  * that could gain from this feature. To use this set of extensions methods 
the user has to
+  * explicitly opt-in by importing
+  * `org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions`.
+  *
+  * For more information and usage examples please consult the Apache Flink 
official documentation.
+  *
+  */
+package object extensions {
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[T](ds: DataStream[T]): OnDataStream[T] =
+    new OnDataStream[T](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[T, K](ds: KeyedStream[T, K]): 
OnKeyedStream[T, K] =
+    new OnKeyedStream[T, K](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[L, R, K, W <: Window](
+      ds: JoinedStreams[L, R]#Where[K]#EqualTo#WithWindow[W]): 
OnJoinedStream[L, R, K, W] =
+    new OnJoinedStream[L, R, K, W](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[IN1, IN2](
+      ds: ConnectedStreams[IN1, IN2]): OnConnectedStream[IN1, IN2] =
+    new OnConnectedStream[IN1, IN2](ds)
+
+  @PublicEvolving
+  implicit def acceptPartialFunctions[T, K, W <: Window](
+      ds: WindowedStream[T, K, W]): OnWindowedStream[T, K, W] =
+    new OnWindowedStream[T, K, W](ds)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala
new file mode 100644
index 0000000..86176e6
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.base
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.util.TestLogger
+import org.scalatest.junit.JUnitSuiteLike
+
+/**
+  * Common facilities to test the `acceptPartialFunctions` extension
+  */
+private[extensions] abstract class AcceptPFTestBase extends TestLogger with 
JUnitSuiteLike {
+
+  private val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  protected val tuples = env.fromElements(
+    1 -> "hello",
+    2 -> "world",
+    3 -> "foo",
+    4 -> "bar",
+    5 -> "baz",
+    6 -> "quux")
+  protected val caseObjects = env.fromElements(
+    KeyValuePair(1, "hello"),
+    KeyValuePair(2, "world"),
+    KeyValuePair(3, "foo"),
+    KeyValuePair(4, "bar"),
+    KeyValuePair(5, "baz"),
+    KeyValuePair(6, "quux"))
+
+  protected val keyedTuples = tuples.keyBy(_._1)
+  protected val keyedCaseObjects = caseObjects.keyBy(_.id)
+
+  protected val windowedTuples = keyedTuples.countWindow(2)
+  protected val windowedCaseObjects = keyedCaseObjects.countWindow(2)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala
new file mode 100644
index 0000000..fad8189
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.data
+
+/**
+  * Simple case class to test the `acceptPartialFunctions` extension
+  *
+  * @param id A numerical identifier
+  * @param value A textual value
+  */
+private [extensions] case class KeyValuePair(id: Int, value: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala
new file mode 100644
index 0000000..f5bb89a
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.ConnectedStreams
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnConnectedDataStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testMapWithOnTuple(): Unit = {
+    val test =
+      tuples.connect(tuples).mapWith({
+        case (id, value) => s"$id $value"
+      }, {
+        case (id, value) => s"$id $value"
+      })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "mapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.connect(caseObjects).mapWith({
+        case KeyValuePair(id, value) => s"$id $value"
+      }, {
+        case KeyValuePair(id, value) => s"$id $value"
+      })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "mapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnTuple(): Unit = {
+    val test =
+      tuples.connect(tuples).flatMapWith({
+        case (id, value) => List(id.toString, value)
+      }, {
+        case (id, value) => List(id.toString, value)
+      })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.connect(caseObjects).flatMapWith({
+        case KeyValuePair(id, value) => List(id.toString, value)
+      }, {
+        case KeyValuePair(id, value) => List(id.toString, value)
+      })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testKeyingByOnTuple(): Unit = {
+    val test =
+      tuples.connect(tuples).keyingBy({
+        case (id, _) => id
+      }, {
+        case (id, _) => id
+      })
+    assert(test.isInstanceOf[ConnectedStreams[_, _]],
+      "keyingBy should produce a ConnectedStreams")
+  }
+
+  @Test
+  def testKeyingByOnCaseClass(): Unit = {
+    val test =
+      caseObjects.connect(caseObjects).keyingBy({
+        case KeyValuePair(id, _) => id
+      }, {
+        case KeyValuePair(id, _) => id
+      })
+    assert(test.isInstanceOf[ConnectedStreams[_, _]],
+      "keyingBy should produce a ConnectedStreams")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala
new file mode 100644
index 0000000..fb6d865
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.{KeyedStream, 
SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnDataStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testMapWithOnTuple(): Unit = {
+    val test =
+      tuples.mapWith {
+        case (id, value) => s"$id $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "mapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.mapWith {
+        case KeyValuePair(id, value) => s"$id $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "mapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnTuple(): Unit = {
+    val test =
+      tuples.flatMapWith {
+        case (id, value) => List(id.toString, value)
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFlatMapWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.flatMapWith {
+        case KeyValuePair(id, value) => List(id.toString, value)
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFilterWithOnTuple(): Unit = {
+    val test =
+      tuples.filterWith {
+        case (id, value) => id == 1
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "filterWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFilterWithOnCaseClass(): Unit = {
+    val test =
+      caseObjects.filterWith {
+        case KeyValuePair(id, value) => id == 1
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "filterWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testKeyingByOnTuple(): Unit = {
+    val test =
+      tuples.keyingBy {
+        case (id, _) => id
+      }
+    assert(test.javaStream.isInstanceOf[KeyedStream[_, _]],
+      "keyingBy should produce a KeyedStream")
+  }
+
+  @Test
+  def testKeyingByOnCaseClass(): Unit = {
+    val test =
+      caseObjects.keyingBy {
+        case KeyValuePair(id, _) => id
+      }
+    assert(test.javaStream.isInstanceOf[KeyedStream[_, _]],
+      "keyingBy should produce a KeyedStream")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
new file mode 100644
index 0000000..34c55d7
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+class OnJoinedDataStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testProjectingOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).
+        where {
+          case (id, _) => id
+        }.equalTo {
+          case (id, _) => id
+        }.window {
+          TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+        }.projecting {
+          case ((_, v1), (_, v2)) => s"$v1 $v2"
+        }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "projecting should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).
+      where {
+        case KeyValuePair(id, _) => id
+      }.equalTo {
+        case KeyValuePair(id, _) => id
+      }.window {
+        TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+   assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+     "projecting should produce a SingleOutputStreamOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
new file mode 100644
index 0000000..f6f153a
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnKeyedDataStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testReduceWithOnTuple(): Unit = {
+    val test =
+      keyedTuples.reduceWith {
+        case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testReduceWithOnCaseClass(): Unit = {
+    val test =
+      keyedCaseObjects.reduceWith {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, 
s"$v1 $v2")
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnTuple(): Unit = {
+    val test =
+      keyedTuples.foldWith("") {
+        case (folding, (_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnCaseClass(): Unit = {
+    val test =
+      keyedCaseObjects.foldWith("") {
+        case (folding, KeyValuePair(_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "flatMapWith should produce a SingleOutputStreamOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
new file mode 100644
index 0000000..4fa9f5a
--- /dev/null
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnWindowedDataStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testReduceWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.reduceWith {
+        case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testReduceWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.reduceWith {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, 
s"$v1 $v2")
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.foldWith("") {
+        case (folding, (_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "foldWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.foldWith("") {
+        case (folding, KeyValuePair(_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "foldWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testApplyWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.applyWith("")(
+        foldFunction = {
+          case (folding, (_, value)) => s"$folding $value"
+        },
+        windowFunction = {
+          case (n, w, head #:: neck #:: _) => Seq(n.toString, 
w.maxTimestamp().toString, head, neck)
+        })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "applyWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testApplyWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.applyWith("")(
+        foldFunction = {
+          case (folding, KeyValuePair(_, value)) => s"$folding $value"
+        },
+        windowFunction = {
+          case (n, w, head #:: neck #:: _) => Seq(n.toString, 
w.maxTimestamp().toString, head, neck)
+        })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "applyWith should produce a SingleOutputStreamOperator")
+  }
+
+}

Reply via email to