I'll give it another shot and let you know if it works. I could update the documentation accordingly afterwards.
On Wed, Sep 15, 2021 at 6:08 PM Reuven Lax <re...@google.com> wrote: > Yes you would - that's the rationale for adding support for generic types > in schema inference. > > On Wed, Sep 15, 2021 at 3:06 PM Cristian Constantinescu <zei...@gmail.com> > wrote: > >> I think I tried that, but can't remember for sure (I'm like 80% sure, >> sorry for the uncertainty, I've been trying many things for various >> problems). And it didn't work. However, if I understand this solution >> correctly, that means that I would have to create these join classes for >> every type I want to join. Is that right? >> >> On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax <re...@google.com> wrote: >> >>> Could you actually fill in the generic type for Iterable? e.g. >>> Iterable<LhsType> lhs; I think without that, the schema won't match. >>> >>> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu < >>> zei...@gmail.com> wrote: >>> >>>> Hi Reuven, >>>> >>>> Thanks for getting back to me. >>>> >>>> To answer your question my initial Joined pojo is: >>>> >>>> @DefaultSchema(JavaFieldSchema.class) >>>> public class JoinedValue { >>>> public JoinedKey key; >>>> >>>> public Iterable lhs; >>>> public Iterable rhs; >>>> } >>>> >>>> >>>> Which is exactly the same as the documentation page, minus the field >>>> names. This is my concern mainly, following the steps documentation does >>>> not work when running the pipeline. I'll try to set up a sample project to >>>> illustrate this if you think it would be helpful. >>>> >>>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu < >>>>> zei...@gmail.com> wrote: >>>>> >>>>>> Hello everyone, >>>>>> >>>>>> As I'm continuing to remove my usage of Row and replacing it with >>>>>> Pojos, I'm following the documentation for the CoGroup transform [1]. >>>>>> >>>>>> As per the documentation, I have created a JoinedKey and a >>>>>> JoinedValue, exactly as the examples given in the documentation except >>>>>> that >>>>>> the key has propA. B and C. >>>>>> >>>>>> I then execute this code: >>>>>> PCollectionTyple.of("lhs", lhs).and("rhs", >>>>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class)) >>>>>> >>>>>> And I get this: >>>>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert >>>>>> between types that don't have equivalent schemas. input schema: Fields: >>>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING, >>>>>> propC STRING> NOT NULL, options={{}}} >>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>>>> Encoding positions: >>>>>> {lhs=1, rhs=2, key=0} >>>>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema: >>>>>> Fields: >>>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING, >>>>>> propC STRING> NOT NULL, options={{}}} >>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}} >>>>>> Encoding positions: >>>>>> {lhs=1, rhs=2, key=0} >>>>>> >>>>>> This is probably because lhs and rhs are Iterable and it's trying to >>>>>> compare the schemas from the CoGroup Rows for lhs and rhs and the >>>>>> Iterable >>>>>> properties from the Joined pojo. We should update the documentation as it >>>>>> doesn't reflect how the code actually behaves (Unless I missed >>>>>> something). >>>>>> >>>>> >>>>> I'm not sure I understand the issue here. What exactly does your >>>>> Joined pojo look like? >>>>> >>>>>> >>>>>> My next step was to try to make the Joined Pojo generic. Like this: >>>>>> @DefaultSchema(JavaFieldSchema.class) >>>>>> public class Joined<Lhs, Rhs> { >>>>>> public JoinedKey key; >>>>>> public Iterable<Lhs> lhs; >>>>>> public Iterable<Rhs> rhs; >>>>>> } >>>>>> >>>>> >>>>> Unfortunately schema inference doesn't work today with generic >>>>> classes. I believe that it's possible to fix this (e.g. we do support >>>>> Coder >>>>> inference in such cases), but today this won't work. >>>>> >>>>> >>>>>> >>>>>> And then execute this code: >>>>>> >>>>>> var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo, >>>>>> MyRhsPojo>>(){}; >>>>>> >>>>>> var keyCoder = SchemaCoder.of(keySchema, >>>>>> TypeDescriptor.of(JoinedKey.class), new >>>>>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new >>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class))); >>>>>> var valueCoder = SchemaCoder.of(keySchema, >>>>>> joinedTypeDescriptor, new >>>>>> JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new >>>>>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor)); >>>>>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs) >>>>>> .apply(Cogroupby...) >>>>>> .apply(Convert.to(joinedTypeDescriptor)) >>>>>> >>>>>> But this give me: >>>>>> Exception in thread "main" java.lang.ClassCastException: class >>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl >>>>>> cannot be cast to class java.lang.Class >>>>>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl >>>>>> is in unnamed module of loader 'app'; java.lang.Class is in module >>>>>> java.base of loader 'bootstrap') >>>>>> at >>>>>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105) >>>>>> (irrelevant stacktrace omitted for brevity) >>>>>> >>>>>> It looks like GetterBasedSchemaProvider.fromRowFunction has an >>>>>> explicit cast to "Class" but there could be instances where a guava type >>>>>> is >>>>>> passed in. >>>>>> >>>>>> So my workaround for now, as elegant as a roadkill, is to do things >>>>>> manually as below. (actual class names replaced with RhsPojo) >>>>>> >>>>>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs) >>>>>> .apply(Cogroupby...) >>>>>> // .apply(Convert.to(joinedTypeDescriptor)) >>>>>> .apply(MapElements.into(joinedTypeDescriptor).via(x >>>>>> -> { >>>>>> var key = new >>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key")); >>>>>> >>>>>> var lhsList = new ArrayList<LhsType>(); >>>>>> var rowJoinedSerializableFunction = new >>>>>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor); >>>>>> for (var item : x.<Row>getIterable("lhs")) { >>>>>> var lhsItem = >>>>>> rowJoinedSerializableFunction.apply(item); >>>>>> lhsList.add(lhsItem); >>>>>> } >>>>>> >>>>>> var rhsList = new ArrayList<RhsPojo>(); >>>>>> var rhsFromRowFn = new >>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class)); >>>>>> for (var item : x.<Row>getIterable("rhs")) { >>>>>> var rhsItem = rhsFromRowFn.apply(item); >>>>>> rhsList.add(rhsItem); >>>>>> } >>>>>> >>>>>> var joined = new Joined<LhsType, RhsPojo>(); >>>>>> >>>>>> joined.key = key; >>>>>> joined.lhs = lhsList; >>>>>> joined.rhs = rhsList; >>>>>> return joined; >>>>>> })); >>>>>> >>>>>> Please let me know how this whole thing should behave. As with the >>>>>> other email I sent earlier, I'll be happy to fix things (if they need >>>>>> fixing) once I deliver some stuff for work. BTW, this is using Beam 2.31. >>>>>> >>>>>> Thank you for your time reading these walls of texts I'm sending and >>>>>> advice! >>>>>> >>>>>> Cheers, >>>>>> Cristian >>>>>> >>>>>> [1] >>>>>> http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html >>>>>> >>>>>