I'll give it another shot and let you know if it works. I could update the
documentation accordingly afterwards.

On Wed, Sep 15, 2021 at 6:08 PM Reuven Lax <re...@google.com> wrote:

> Yes you would - that's the rationale for adding support for generic types
> in schema inference.
>
> On Wed, Sep 15, 2021 at 3:06 PM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> I think I tried that, but can't remember for sure (I'm like 80% sure,
>> sorry for the uncertainty, I've been trying many things for various
>> problems). And it didn't work. However, if I understand this solution
>> correctly, that means that I would have to create these join classes for
>> every type I want to join. Is that right?
>>
>> On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Could you actually fill in the generic type for Iterable? e.g.
>>> Iterable<LhsType> lhs; I think without that, the schema won't match.
>>>
>>> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> Thanks for getting back to me.
>>>>
>>>> To answer your question my initial Joined pojo is:
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>   public class JoinedValue {
>>>>     public JoinedKey key;
>>>>
>>>>     public Iterable lhs;
>>>>     public Iterable rhs;
>>>>   }
>>>>
>>>>
>>>> Which is exactly the same as the documentation page, minus the field
>>>> names. This is my concern mainly, following the steps documentation does
>>>> not work when running the pipeline. I'll try to set up a sample project to
>>>> illustrate this if you think it would be helpful.
>>>>
>>>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
>>>>> zei...@gmail.com> wrote:
>>>>>
>>>>>> Hello everyone,
>>>>>>
>>>>>> As I'm continuing to remove my usage of Row and replacing it with
>>>>>> Pojos, I'm following the documentation for the CoGroup transform [1].
>>>>>>
>>>>>> As per the documentation, I have created a JoinedKey and a
>>>>>> JoinedValue, exactly as the examples given in the documentation except 
>>>>>> that
>>>>>> the key has propA. B and C.
>>>>>>
>>>>>> I then execute this code:
>>>>>> PCollectionTyple.of("lhs", lhs).and("rhs",
>>>>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>>>>>
>>>>>> And I get this:
>>>>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>>>>>> between types that don't have equivalent schemas. input schema: Fields:
>>>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING,
>>>>>> propC STRING> NOT NULL, options={{}}}
>>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Encoding positions:
>>>>>> {lhs=1, rhs=2, key=0}
>>>>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>>>>>> Fields:
>>>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING,
>>>>>> propC STRING> NOT NULL, options={{}}}
>>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Encoding positions:
>>>>>> {lhs=1, rhs=2, key=0}
>>>>>>
>>>>>> This is probably because lhs and rhs are Iterable and it's trying to
>>>>>> compare the schemas from the CoGroup Rows for lhs and rhs and the 
>>>>>> Iterable
>>>>>> properties from the Joined pojo. We should update the documentation as it
>>>>>> doesn't reflect how the code actually behaves (Unless I missed 
>>>>>> something).
>>>>>>
>>>>>
>>>>> I'm not sure I  understand the issue here. What exactly does your
>>>>> Joined pojo look like?
>>>>>
>>>>>>
>>>>>> My next step was to try to make the Joined Pojo generic. Like this:
>>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>>> public class Joined<Lhs, Rhs> {
>>>>>>     public JoinedKey key;
>>>>>>     public Iterable<Lhs> lhs;
>>>>>>     public Iterable<Rhs> rhs;
>>>>>> }
>>>>>>
>>>>>
>>>>> Unfortunately schema inference doesn't work today with generic
>>>>> classes. I believe that it's possible to fix this (e.g. we do support 
>>>>> Coder
>>>>> inference in such cases), but today this won't work.
>>>>>
>>>>>
>>>>>>
>>>>>> And then execute this code:
>>>>>>
>>>>>> var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo,
>>>>>> MyRhsPojo>>(){};
>>>>>>
>>>>>>    var keyCoder = SchemaCoder.of(keySchema,
>>>>>> TypeDescriptor.of(JoinedKey.class), new
>>>>>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>>>>>>         var valueCoder = SchemaCoder.of(keySchema,
>>>>>> joinedTypeDescriptor, new
>>>>>> JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>>>>>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>>>>>>         var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>>>>>                 .apply(Cogroupby...)
>>>>>>                 .apply(Convert.to(joinedTypeDescriptor))
>>>>>>
>>>>>> But this give me:
>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>>>>>> cannot be cast to class java.lang.Class
>>>>>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>>>>>> is in unnamed module of loader 'app'; java.lang.Class is in module
>>>>>> java.base of loader 'bootstrap')
>>>>>> at
>>>>>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
>>>>>> (irrelevant stacktrace omitted for brevity)
>>>>>>
>>>>>> It looks like GetterBasedSchemaProvider.fromRowFunction has an
>>>>>> explicit cast to "Class" but there could be instances where a guava type 
>>>>>> is
>>>>>> passed in.
>>>>>>
>>>>>> So my workaround for now, as elegant as a roadkill, is to do things
>>>>>> manually as below. (actual class names replaced with RhsPojo)
>>>>>>
>>>>>>  var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>>>>>                 .apply(Cogroupby...)
>>>>>> //                .apply(Convert.to(joinedTypeDescriptor))
>>>>>>                 .apply(MapElements.into(joinedTypeDescriptor).via(x
>>>>>> -> {
>>>>>>                     var key = new
>>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key"));
>>>>>>
>>>>>>                     var lhsList = new ArrayList<LhsType>();
>>>>>>                     var rowJoinedSerializableFunction = new
>>>>>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor);
>>>>>>                     for (var item : x.<Row>getIterable("lhs")) {
>>>>>>                         var lhsItem =
>>>>>> rowJoinedSerializableFunction.apply(item);
>>>>>>                         lhsList.add(lhsItem);
>>>>>>                     }
>>>>>>
>>>>>>                     var rhsList = new ArrayList<RhsPojo>();
>>>>>>                     var rhsFromRowFn = new
>>>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class));
>>>>>>                     for (var item : x.<Row>getIterable("rhs")) {
>>>>>>                         var rhsItem = rhsFromRowFn.apply(item);
>>>>>>                         rhsList.add(rhsItem);
>>>>>>                     }
>>>>>>
>>>>>>                     var joined = new Joined<LhsType, RhsPojo>();
>>>>>>
>>>>>>                     joined.key = key;
>>>>>>                     joined.lhs = lhsList;
>>>>>>                     joined.rhs = rhsList;
>>>>>>                     return joined;
>>>>>>                 }));
>>>>>>
>>>>>> Please let me know how this whole thing should behave. As with the
>>>>>> other email I sent earlier, I'll be happy to fix things (if they need
>>>>>> fixing) once I deliver some stuff for work. BTW, this is using Beam 2.31.
>>>>>>
>>>>>> Thank you for your time reading these walls of texts I'm sending and
>>>>>> advice!
>>>>>>
>>>>>> Cheers,
>>>>>> Cristian
>>>>>>
>>>>>> [1]
>>>>>> http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html
>>>>>>
>>>>>

Reply via email to