Hi!

Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.


Thank you,

Michalis


On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
Hi Michalis,

Please let us know if you need help :)

Best,
Jorge

On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote:

Hi Michalis,
can you double check that you define the types of the Tuple2 output?


According to the error
"Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems 
like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be 
the problem.

Also to give some context, if you see in the FlinkMaterializedGroupByOperator 
code there are some utility functions we use to map the Wayang UDFs to the 
Flink (or Spark) UDFs. For example, the line:
final KeySelector<Type, KeyType> keyExtractor =        
flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you 
check the code of this class you could spot the problem?
Best
--
Zoi

    Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis 
<[email protected]> έγραψε:

Hello!


I've been working with the missing operator tests for Flink. I've
successfully done the SortOperator and the UnionAllOperator by seeing
the respective Spark operator tests and modifying RddChannel to
DataSetChannel.


I'm having trouble with the tests for other operators though, for
example the FlinkMaterializedGroupByOperator. I tried starting with
SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
DataSetChannel modifications as before, but I get the following error:


[ERROR]
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
Time elapsed: 1.911 s  <<< ERROR!
org.apache.flink.api.common.InvalidProgramException: Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
of KeySelector class
org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
         at
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)


Digging into the operator code a bit more, the error happens at

dataSetInput.groupBy(keyExtractor);


Any ideas on what should be changed?


Here is a permalink to the respective spark test
[https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]


Thank you,

Michalis Vargiamis

Reply via email to