Thanks a lot Michalis. 
I just made a review on your PR. Please take a look.
And let us know if you find an API discrepancy with Flink.
Best
--
Zoi

    Στις Τετάρτη 3 Μαΐου 2023 στις 05:43:56 μ.μ. CEST, ο χρήστης Michalis 
Vargiamis <[email protected]> έγραψε:  
 
 I created the pull request here 
[https://github.com/apache/incubator-wayang/pull/316].

I can also verify that the ReduceBy and the Join operator throw the same 
exception I encountered! I'll check out the documentation as you suggested.


Thanks,

Michalis


On 03-May-23 4:38 PM, Zoi Kaoudi wrote:
>  Hi Michalis,
>
> first, I suggest to make a pull request with what you have that is working.
> Regarding the error, I have not encountered this before. I think there should 
> be a bug in the Flink operator implementation. By browsing the Flink tests we 
> currently have, it seems that ReduceBy and Join also throw some errors. Can 
> you check if the ReduceBy and the Join operator throw the same exception with 
> the one you have?
> If so, I suggest taking one of them and going to the Flink documentation of 
> that operator. It may be that we are using an older API of Flink.
> Best
> --
> Zoi
>
>      Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis 
>Vargiamis <[email protected]> έγραψε:
>  
>  Hello!
>
>
> Regarding the Flink operator tests, there is progress. First of all, I
> looked at the already implemented Spark operator tests, some of them
> were already implemented for Flink and some others did not exist for
> Flink (like the BernoulliSampleOperator). So what I have implemented so
> far are the UnionAllOperator and the SortOperator which went very
> smoothly, and also the MaterializedGroupByOperator for which I had to do
> some tweaks in the test code and also add the following line to the
> operator code
>
> .returns(this.getOutputType().getDataUnitType().getTypeClass());
>
>
> Now, regarding the CoGroupOperator, GlobalReduceOperator,
> MapPartitionsOperator, I get the following error for all of them, that
> has got me kind of stuck:
>
>
> java.lang.IllegalArgumentException
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
>          at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
>          at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
>          at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
>          at
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
>          at
> org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
>          at
> org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
>          at
> org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
>          at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>          at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>          at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>          at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>          at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>          at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>          at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>          at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>          at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>          at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>          at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>          at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>          at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>          at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>          at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>          at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>          at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>          at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>          at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>          at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
>          at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>
>
> Any help or feedback would be appreciated!
>
>
> Thanks,
>
> Michalis
>
>
> On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
>> Hi Michalis,
>>
>> Sure! It was more of a heads-up than anything else.
>> Please don’t hesitate to pose your questions here or to call for a meeting 
>> if necessary :)
>>
>> Best,
>> Jorge
>>
>>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <[email protected]> 
>>> wrote:
>>>
>>> Hi!
>>>
>>>
>>> Thanks! Actually I've lost quite some time re configuring my setup. I 
>>> currently have a windows laptop, so for the Linux I initially tried the WSL 
>>> that windows provide but it turned out to make the whole 
>>> development/debugging process quite inefficient, so i switched to vmware 
>>> but then again I lost some time with some other stuff. Anyway, I'd have to 
>>> ask for a bit more time on this one.
>>>
>>>
>>> Thank you,
>>>
>>> Michalis
>>>
>>>
>>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>>> Hi Michalis,
>>>>
>>>> Please let us know if you need help :)
>>>>
>>>> Best,
>>>> Jorge
>>>>
>>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote:
>>>>>
>>>>> Hi Michalis,
>>>>> can you double check that you define the types of the Tuple2 output?
>>>>>
>>>>>
>>>>> According to the error
>>>>> "Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it 
>>>>> seems like the Tuple2 does not have specific types but they are 
>>>>> java.lang.Objects. Maybe that could be the problem.
>>>>>
>>>>> Also to give some context, if you see in the 
>>>>> FlinkMaterializedGroupByOperator code there are some utility functions we 
>>>>> use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the 
>>>>> line:
>>>>> final KeySelector<Type, KeyType> keyExtractor =        
>>>>> flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe 
>>>>> if you check the code of this class you could spot the problem?
>>>>> Best
>>>>> --
>>>>> Zoi
>>>>>
>>>>>      Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης 
>>>>>Michalis Vargiamis <[email protected]> έγραψε:
>>>>>
>>>>> Hello!
>>>>>
>>>>>
>>>>> I've been working with the missing operator tests for Flink. I've
>>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>>> the respective Spark operator tests and modifying RddChannel to
>>>>> DataSetChannel.
>>>>>
>>>>>
>>>>> I'm having trouble with the tests for other operators though, for
>>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>>> DataSetChannel modifications as before, but I get the following error:
>>>>>
>>>>>
>>>>> [ERROR]
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>>> of KeySelector class
>>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key 
>>>>> type
>>>>>            at
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>>
>>>>>
>>>>> Digging into the operator code a bit more, the error happens at
>>>>>
>>>>> dataSetInput.groupBy(keyExtractor);
>>>>>
>>>>>
>>>>> Any ideas on what should be changed?
>>>>>
>>>>>
>>>>> Here is a permalink to the respective spark test
>>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>>
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Michalis Vargiamis
>>>>>
>    
  

Reply via email to