gaoyunhaii commented on a change in pull request #23:
URL: https://github.com/apache/flink-ml/pull/23#discussion_r746302145



##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationBody.java
##########
@@ -44,7 +51,7 @@
  * streams returned by the IterationBody.
  */
 @Experimental
-public interface IterationBody {
+public interface IterationBody extends Serializable {

Review comment:
       It seems there would be error like the following stack. The lambda 
defined inside the IterationBody would require its wrapper class to be 
serializable.
   
   ```
   org.apache.flink.api.common.InvalidProgramException: Object 
org.apache.flink.test.iteration.BoundedMixedLifeCycleStreamIterationITCase$MixedLifeCycleIterationBody$$Lambda$738/65894433@56a4f272
 is not serializable
        at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:195)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2141)
        at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
        at 
org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:697)
        at 
org.apache.flink.test.iteration.BoundedMixedLifeCycleStreamIterationITCase$MixedLifeCycleIterationBody.process(BoundedMixedLifeCycleStreamIterationITCase.java:187)
        at 
org.apache.flink.iteration.Iterations.createIteration(Iterations.java:231)
        at 
org.apache.flink.iteration.Iterations.iterateBoundedStreamsUntilTermination(Iterations.java:162)
        at 
org.apache.flink.test.iteration.BoundedMixedLifeCycleStreamIterationITCase.createJobGraphWithMixedLifeCycle(BoundedMixedLifeCycleStreamIterationITCase.java:126)
        at 
org.apache.flink.test.iteration.BoundedMixedLifeCycleStreamIterationITCase.testIterationBodyWithMixedLifeCycle(BoundedMixedLifeCycleStreamIterationITCase.java:91)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
        at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
        at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
        at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
        at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
        at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:142)
        at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:113)
        at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
        at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
        at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
        at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
   Caused by: java.io.NotSerializableException: 
org.apache.flink.test.iteration.BoundedMixedLifeCycleStreamIterationITCase$MixedLifeCycleIterationBody
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
        at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:193)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to