Hi Michalis, Please let us know if you need help :)
Best, Jorge > On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote: > > Hi Michalis, > can you double check that you define the types of the Tuple2 output? > > > According to the error > "Return type > PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: > GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it > seems like the Tuple2 does not have specific types but they are > java.lang.Objects. Maybe that could be the problem. > > Also to give some context, if you see in the FlinkMaterializedGroupByOperator > code there are some utility functions we use to map the Wayang UDFs to the > Flink (or Spark) UDFs. For example, the line: > final KeySelector<Type, KeyType> keyExtractor = > flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor()); > converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if > you check the code of this class you could spot the problem? > Best > -- > Zoi > > Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis > Vargiamis <[email protected]> έγραψε: > > Hello! > > > I've been working with the missing operator tests for Flink. I've > successfully done the SortOperator and the UnionAllOperator by seeing > the respective Spark operator tests and modifying RddChannel to > DataSetChannel. > > > I'm having trouble with the tests for other operators though, for > example the FlinkMaterializedGroupByOperator. I tried starting with > SparkMaterializedGroupByOperatorTest and doing the same RddChannel to > DataSetChannel modifications as before, but I get the following error: > > > [ERROR] > org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution > > Time elapsed: 1.911 s <<< ERROR! > org.apache.flink.api.common.InvalidProgramException: Return type > PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: > GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]> > of KeySelector class > org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type > at > org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50) > > > Digging into the operator code a bit more, the error happens at > > dataSetInput.groupBy(keyExtractor); > > > Any ideas on what should be changed? > > > Here is a permalink to the respective spark test > [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java] > > > Thank you, > > Michalis Vargiamis >
