I think I tried that, but can't remember for sure (I'm like 80% sure, sorry
for the uncertainty, I've been trying many things for various problems).
And it didn't work. However, if I understand this solution correctly, that
means that I would have to create these join classes for every type I want
to join. Is that right?

On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax <re...@google.com> wrote:

> Could you actually fill in the generic type for Iterable? e.g.
> Iterable<LhsType> lhs; I think without that, the schema won't match.
>
> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> Hi Reuven,
>>
>> Thanks for getting back to me.
>>
>> To answer your question my initial Joined pojo is:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>   public class JoinedValue {
>>     public JoinedKey key;
>>
>>     public Iterable lhs;
>>     public Iterable rhs;
>>   }
>>
>>
>> Which is exactly the same as the documentation page, minus the field
>> names. This is my concern mainly, following the steps documentation does
>> not work when running the pipeline. I'll try to set up a sample project to
>> illustrate this if you think it would be helpful.
>>
>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> As I'm continuing to remove my usage of Row and replacing it with
>>>> Pojos, I'm following the documentation for the CoGroup transform [1].
>>>>
>>>> As per the documentation, I have created a JoinedKey and a JoinedValue,
>>>> exactly as the examples given in the documentation except that the key has
>>>> propA. B and C.
>>>>
>>>> I then execute this code:
>>>> PCollectionTyple.of("lhs", lhs).and("rhs",
>>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>>>
>>>> And I get this:
>>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>>>> between types that don't have equivalent schemas. input schema: Fields:
>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING,
>>>> propC STRING> NOT NULL, options={{}}}
>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Encoding positions:
>>>> {lhs=1, rhs=2, key=0}
>>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>>>> Fields:
>>>> Field{name=key, description=, type=ROW<propA STRING, propB STRING,
>>>> propC STRING> NOT NULL, options={{}}}
>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Encoding positions:
>>>> {lhs=1, rhs=2, key=0}
>>>>
>>>> This is probably because lhs and rhs are Iterable and it's trying to
>>>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
>>>> properties from the Joined pojo. We should update the documentation as it
>>>> doesn't reflect how the code actually behaves (Unless I missed something).
>>>>
>>>
>>> I'm not sure I  understand the issue here. What exactly does your Joined
>>> pojo look like?
>>>
>>>>
>>>> My next step was to try to make the Joined Pojo generic. Like this:
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>> public class Joined<Lhs, Rhs> {
>>>>     public JoinedKey key;
>>>>     public Iterable<Lhs> lhs;
>>>>     public Iterable<Rhs> rhs;
>>>> }
>>>>
>>>
>>> Unfortunately schema inference doesn't work today with generic classes.
>>> I believe that it's possible to fix this (e.g. we do support Coder
>>> inference in such cases), but today this won't work.
>>>
>>>
>>>>
>>>> And then execute this code:
>>>>
>>>> var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo,
>>>> MyRhsPojo>>(){};
>>>>
>>>>    var keyCoder = SchemaCoder.of(keySchema,
>>>> TypeDescriptor.of(JoinedKey.class), new
>>>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>>>>         var valueCoder = SchemaCoder.of(keySchema,
>>>> joinedTypeDescriptor, new
>>>> JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>>>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>>>>         var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>>>                 .apply(Cogroupby...)
>>>>                 .apply(Convert.to(joinedTypeDescriptor))
>>>>
>>>> But this give me:
>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>>>> cannot be cast to class java.lang.Class
>>>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>>>> is in unnamed module of loader 'app'; java.lang.Class is in module
>>>> java.base of loader 'bootstrap')
>>>> at
>>>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
>>>> (irrelevant stacktrace omitted for brevity)
>>>>
>>>> It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
>>>> cast to "Class" but there could be instances where a guava type is passed
>>>> in.
>>>>
>>>> So my workaround for now, as elegant as a roadkill, is to do things
>>>> manually as below. (actual class names replaced with RhsPojo)
>>>>
>>>>  var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>>>                 .apply(Cogroupby...)
>>>> //                .apply(Convert.to(joinedTypeDescriptor))
>>>>                 .apply(MapElements.into(joinedTypeDescriptor).via(x -> {
>>>>                     var key = new
>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key"));
>>>>
>>>>                     var lhsList = new ArrayList<LhsType>();
>>>>                     var rowJoinedSerializableFunction = new
>>>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor);
>>>>                     for (var item : x.<Row>getIterable("lhs")) {
>>>>                         var lhsItem =
>>>> rowJoinedSerializableFunction.apply(item);
>>>>                         lhsList.add(lhsItem);
>>>>                     }
>>>>
>>>>                     var rhsList = new ArrayList<RhsPojo>();
>>>>                     var rhsFromRowFn = new
>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class));
>>>>                     for (var item : x.<Row>getIterable("rhs")) {
>>>>                         var rhsItem = rhsFromRowFn.apply(item);
>>>>                         rhsList.add(rhsItem);
>>>>                     }
>>>>
>>>>                     var joined = new Joined<LhsType, RhsPojo>();
>>>>
>>>>                     joined.key = key;
>>>>                     joined.lhs = lhsList;
>>>>                     joined.rhs = rhsList;
>>>>                     return joined;
>>>>                 }));
>>>>
>>>> Please let me know how this whole thing should behave. As with the
>>>> other email I sent earlier, I'll be happy to fix things (if they need
>>>> fixing) once I deliver some stuff for work. BTW, this is using Beam 2.31.
>>>>
>>>> Thank you for your time reading these walls of texts I'm sending and
>>>> advice!
>>>>
>>>> Cheers,
>>>> Cristian
>>>>
>>>> [1]
>>>> http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html
>>>>
>>>

Reply via email to