I think I tried that, but can't remember for sure (I'm like 80% sure, sorry for the uncertainty, I've been trying many things for various problems). And it didn't work. However, if I understand this solution correctly, that means that I would have to create these join classes for every type I want to join. Is that right?
On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax <re...@google.com> wrote: > Could you actually fill in the generic type for Iterable? e.g. > Iterable<LhsType> lhs; I think without that, the schema won't match. > > On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu <zei...@gmail.com> > wrote: > >> Hi Reuven, >> >> Thanks for getting back to me. >> >> To answer your question my initial Joined pojo is: >> >> @DefaultSchema(JavaFieldSchema.class) >> public class JoinedValue { >> public JoinedKey key; >> >> public Iterable lhs; >> public Iterable rhs; >> } >> >> >> Which is exactly the same as the documentation page, minus the field >> names. This is my concern mainly, following the steps documentation does >> not work when running the pipeline. I'll try to set up a sample project to >> illustrate this if you think it would be helpful. >> >> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu < >>> zei...@gmail.com> wrote: >>> >>>> Hello everyone, >>>> >>>> As I'm continuing to remove my usage of Row and replacing it with >>>> Pojos, I'm following the documentation for the CoGroup transform [1]. >>>> >>>> As per the documentation, I have created a JoinedKey and a JoinedValue, >>>> exactly as the examples given in the documentation except that the key has >>>> propA. B and C. >>>> >>>> I then execute this code: >>>> PCollectionTyple.of("lhs", lhs).and("rhs", >>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class)) >>>> >>>> And I get this: >>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert >>>> between types that don't have equivalent schemas. input schema: Fields: >>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING, >>>> propC STRING> NOT NULL, options={{}}} >>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>> Encoding positions: >>>> {lhs=1, rhs=2, key=0} >>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema: >>>> Fields: >>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING, >>>> propC STRING> NOT NULL, options={{}}} >>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>> Encoding positions: >>>> {lhs=1, rhs=2, key=0} >>>> >>>> This is probably because lhs and rhs are Iterable and it's trying to >>>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable >>>> properties from the Joined pojo. We should update the documentation as it >>>> doesn't reflect how the code actually behaves (Unless I missed something). >>>> >>> >>> I'm not sure I understand the issue here. What exactly does your Joined >>> pojo look like? >>> >>>> >>>> My next step was to try to make the Joined Pojo generic. Like this: >>>> @DefaultSchema(JavaFieldSchema.class) >>>> public class Joined<Lhs, Rhs> { >>>> public JoinedKey key; >>>> public Iterable<Lhs> lhs; >>>> public Iterable<Rhs> rhs; >>>> } >>>> >>> >>> Unfortunately schema inference doesn't work today with generic classes. >>> I believe that it's possible to fix this (e.g. we do support Coder >>> inference in such cases), but today this won't work. >>> >>> >>>> >>>> And then execute this code: >>>> >>>> var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo, >>>> MyRhsPojo>>(){}; >>>> >>>> var keyCoder = SchemaCoder.of(keySchema, >>>> TypeDescriptor.of(JoinedKey.class), new >>>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new >>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class))); >>>> var valueCoder = SchemaCoder.of(keySchema, >>>> joinedTypeDescriptor, new >>>> JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new >>>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor)); >>>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs) >>>> .apply(Cogroupby...) >>>> .apply(Convert.to(joinedTypeDescriptor)) >>>> >>>> But this give me: >>>> Exception in thread "main" java.lang.ClassCastException: class >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl >>>> cannot be cast to class java.lang.Class >>>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl >>>> is in unnamed module of loader 'app'; java.lang.Class is in module >>>> java.base of loader 'bootstrap') >>>> at >>>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105) >>>> (irrelevant stacktrace omitted for brevity) >>>> >>>> It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit >>>> cast to "Class" but there could be instances where a guava type is passed >>>> in. >>>> >>>> So my workaround for now, as elegant as a roadkill, is to do things >>>> manually as below. (actual class names replaced with RhsPojo) >>>> >>>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs) >>>> .apply(Cogroupby...) >>>> // .apply(Convert.to(joinedTypeDescriptor)) >>>> .apply(MapElements.into(joinedTypeDescriptor).via(x -> { >>>> var key = new >>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key")); >>>> >>>> var lhsList = new ArrayList<LhsType>(); >>>> var rowJoinedSerializableFunction = new >>>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor); >>>> for (var item : x.<Row>getIterable("lhs")) { >>>> var lhsItem = >>>> rowJoinedSerializableFunction.apply(item); >>>> lhsList.add(lhsItem); >>>> } >>>> >>>> var rhsList = new ArrayList<RhsPojo>(); >>>> var rhsFromRowFn = new >>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class)); >>>> for (var item : x.<Row>getIterable("rhs")) { >>>> var rhsItem = rhsFromRowFn.apply(item); >>>> rhsList.add(rhsItem); >>>> } >>>> >>>> var joined = new Joined<LhsType, RhsPojo>(); >>>> >>>> joined.key = key; >>>> joined.lhs = lhsList; >>>> joined.rhs = rhsList; >>>> return joined; >>>> })); >>>> >>>> Please let me know how this whole thing should behave. As with the >>>> other email I sent earlier, I'll be happy to fix things (if they need >>>> fixing) once I deliver some stuff for work. BTW, this is using Beam 2.31. >>>> >>>> Thank you for your time reading these walls of texts I'm sending and >>>> advice! >>>> >>>> Cheers, >>>> Cristian >>>> >>>> [1] >>>> http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html >>>> >>>