Hi Michalis,

Please let us know if you need help :)

Best,
Jorge

> On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote:
> 
> Hi Michalis,
> can you double check that you define the types of the Tuple2 output? 
> 
> 
> According to the error 
> "Return type 
> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: 
> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it 
> seems like the Tuple2 does not have specific types but they are 
> java.lang.Objects. Maybe that could be the problem. 
> 
> Also to give some context, if you see in the FlinkMaterializedGroupByOperator 
> code there are some utility functions we use to map the Wayang UDFs to the 
> Flink (or Spark) UDFs. For example, the line:
> final KeySelector<Type, KeyType> keyExtractor =        
> flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if 
> you check the code of this class you could spot the problem?
> Best
> --
> Zoi
> 
>    Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis 
> Vargiamis <[email protected]> έγραψε:  
> 
> Hello!
> 
> 
> I've been working with the missing operator tests for Flink. I've 
> successfully done the SortOperator and the UnionAllOperator by seeing 
> the respective Spark operator tests and modifying RddChannel to 
> DataSetChannel.
> 
> 
> I'm having trouble with the tests for other operators though, for 
> example the FlinkMaterializedGroupByOperator. I tried starting with 
> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to 
> DataSetChannel modifications as before, but I get the following error:
> 
> 
> [ERROR] 
> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>  
> Time elapsed: 1.911 s  <<< ERROR!
> org.apache.flink.api.common.InvalidProgramException: Return type 
> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: 
> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]> 
> of KeySelector class 
> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>         at 
> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
> 
> 
> Digging into the operator code a bit more, the error happens at
> 
> dataSetInput.groupBy(keyExtractor);
> 
> 
> Any ideas on what should be changed?
> 
> 
> Here is a permalink to the respective spark test 
> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
> 
> 
> Thank you,
> 
> Michalis Vargiamis
> 

Reply via email to