twalthr commented on a change in pull request #13320:
URL: https://github.com/apache/flink/pull/13320#discussion_r483579510



##########
File path: flink-streaming-scala/pom.xml
##########
@@ -262,6 +262,22 @@ under the License.
                                                        
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
                                                        
<exclude>org.apache.flink.streaming.api.scala.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
                                                        
<exclude>org.apache.flink.streaming.api.scala.ConnectedStreams#keyBy(scala.Function1,scala.Function1,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+
+                                                       
<exlcude>org.apache.flink.streaming.api.scala.AllWindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.scala.function.AllWindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exlcude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#apply(java.lang.Object,scala.Function2,scala.Function3,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.scala.function.AllWindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#fold(java.lang.Object,scala.Function2,scala.Function3,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.KeyedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.KeyedStream#fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.scala.function.WindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#apply(java.lang.Object,scala.Function2,scala.Function4,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.scala.function.WindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+                                                       
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,scala.Function2,scala.Function4,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+

Review comment:
       nit: remove empty line and add a comment to this section that fold was 
dropped in 1.12

##########
File path: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
##########
@@ -35,10 +35,13 @@ import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 
+import org.hamcrest.CoreMatchers.equalTo
 import org.junit.Assert._
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
+import java.lang
+
 class DataStreamTest extends AbstractTestBase {

Review comment:
       update `testResource()` in this class

##########
File path: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
##########
@@ -91,54 +91,10 @@ class AllWindowTranslationTest {
     fail("exception was not thrown")
   }
 
-  /**
-    * .fold() does not support [[RichFoldFunction]], since the reduce function 
is used internally
-    * in a [[org.apache.flink.api.common.state.FoldingState]].
-    */
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testFoldWithRichFolderFails() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-    source
-      .windowAll(SlidingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-      .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] {
-        override def fold(accumulator: (String, Int), value: (String, Int)) = 
null
-      })
-
-    fail("exception was not thrown")
-  }
-
   // ------------------------------------------------------------------------
   //  merging window precondition
   // ------------------------------------------------------------------------
 
-  @Test
-  def testSessionWithFoldFails() {
-    // verify that fold does not work with merging windows
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val windowedStream = env.fromElements("Hello", "Ciao")
-      .windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)))
-
-    try
-      windowedStream.fold("", new FoldFunction[String, String]() {
-        @throws[Exception]
-        def fold(accumulator: String, value: String): String = accumulator
-      })
-
-    catch {
-      case _: UnsupportedOperationException =>
-        // expected
-        // use a catch to ensure that the exception is thrown by the fold

Review comment:
       this comment still occurs in the file

##########
File path: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
##########
@@ -92,56 +93,10 @@ class WindowTranslationTest {
     fail("exception was not thrown")
   }
 
-  /**
-    * .fold() does not support [[RichFoldFunction]], since the reduce function 
is used internally
-    * in a [[org.apache.flink.api.common.state.FoldingState]].
-    */
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testFoldWithRichFolderFails() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-    source
-      .keyBy(0)
-      .window(SlidingEventTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-      .fold(("", 0), new RichFoldFunction[(String, Int), (String, Int)] {
-        override def fold(accumulator: (String, Int), value: (String, Int)) = 
null
-      })
-
-    fail("exception was not thrown")
-  }
-
   // --------------------------------------------------------------------------
   //  merging window checks
   // --------------------------------------------------------------------------
 
-  @Test
-  def testSessionWithFoldFails() {
-    // verify that fold does not work with merging windows
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val windowedStream = env.fromElements("Hello", "Ciao")
-      .keyBy(x => x)
-      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
-
-    try
-      windowedStream.fold("", new FoldFunction[String, String]() {
-        @throws[Exception]
-        def fold(accumulator: String, value: String): String = accumulator
-      })
-
-    catch {
-      case _: UnsupportedOperationException =>
-        // expected
-        // use a catch to ensure that the exception is thrown by the fold

Review comment:
       this comment is does still exist in the class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to