Hi Reuven,

Thanks for getting back to me.

To answer your question my initial Joined pojo is:

@DefaultSchema(JavaFieldSchema.class)
  public class JoinedValue {
    public JoinedKey key;

    public Iterable lhs;
    public Iterable rhs;
  }


Which is exactly the same as the documentation page, minus the field names.
This is my concern mainly, following the steps documentation does not work
when running the pipeline. I'll try to set up a sample project to
illustrate this if you think it would be helpful.

On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> Hello everyone,
>>
>> As I'm continuing to remove my usage of Row and replacing it with Pojos,
>> I'm following the documentation for the CoGroup transform [1].
>>
>> As per the documentation, I have created a JoinedKey and a JoinedValue,
>> exactly as the examples given in the documentation except that the key has
>> propA. B and C.
>>
>> I then execute this code:
>> PCollectionTyple.of("lhs", lhs).and("rhs",
>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>
>> And I get this:
>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>> between types that don't have equivalent schemas. input schema: Fields:
>> Field{name=key, description=, type=ROW<propA STRING, propB STRING, propC
>> STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>> Fields:
>> Field{name=key, description=, type=ROW<propA STRING, propB STRING,
>> propC STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>>
>> This is probably because lhs and rhs are Iterable and it's trying to
>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
>> properties from the Joined pojo. We should update the documentation as it
>> doesn't reflect how the code actually behaves (Unless I missed something).
>>
>
> I'm not sure I  understand the issue here. What exactly does your Joined
> pojo look like?
>
>>
>> My next step was to try to make the Joined Pojo generic. Like this:
>> @DefaultSchema(JavaFieldSchema.class)
>> public class Joined<Lhs, Rhs> {
>>     public JoinedKey key;
>>     public Iterable<Lhs> lhs;
>>     public Iterable<Rhs> rhs;
>> }
>>
>
> Unfortunately schema inference doesn't work today with generic classes. I
> believe that it's possible to fix this (e.g. we do support Coder inference
> in such cases), but today this won't work.
>
>
>>
>> And then execute this code:
>>
>> var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo,
>> MyRhsPojo>>(){};
>>
>>    var keyCoder = SchemaCoder.of(keySchema,
>> TypeDescriptor.of(JoinedKey.class), new
>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>>         var valueCoder = SchemaCoder.of(keySchema, joinedTypeDescriptor,
>> new JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>>         var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>                 .apply(Cogroupby...)
>>                 .apply(Convert.to(joinedTypeDescriptor))
>>
>> But this give me:
>> Exception in thread "main" java.lang.ClassCastException: class
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> cannot be cast to class java.lang.Class
>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> is in unnamed module of loader 'app'; java.lang.Class is in module
>> java.base of loader 'bootstrap')
>> at
>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
>> (irrelevant stacktrace omitted for brevity)
>>
>> It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
>> cast to "Class" but there could be instances where a guava type is passed
>> in.
>>
>> So my workaround for now, as elegant as a roadkill, is to do things
>> manually as below. (actual class names replaced with RhsPojo)
>>
>>  var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>>                 .apply(Cogroupby...)
>> //                .apply(Convert.to(joinedTypeDescriptor))
>>                 .apply(MapElements.into(joinedTypeDescriptor).via(x -> {
>>                     var key = new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key"));
>>
>>                     var lhsList = new ArrayList<LhsType>();
>>                     var rowJoinedSerializableFunction = new
>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor);
>>                     for (var item : x.<Row>getIterable("lhs")) {
>>                         var lhsItem =
>> rowJoinedSerializableFunction.apply(item);
>>                         lhsList.add(lhsItem);
>>                     }
>>
>>                     var rhsList = new ArrayList<RhsPojo>();
>>                     var rhsFromRowFn = new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class));
>>                     for (var item : x.<Row>getIterable("rhs")) {
>>                         var rhsItem = rhsFromRowFn.apply(item);
>>                         rhsList.add(rhsItem);
>>                     }
>>
>>                     var joined = new Joined<LhsType, RhsPojo>();
>>
>>                     joined.key = key;
>>                     joined.lhs = lhsList;
>>                     joined.rhs = rhsList;
>>                     return joined;
>>                 }));
>>
>> Please let me know how this whole thing should behave. As with the other
>> email I sent earlier, I'll be happy to fix things (if they need fixing)
>> once I deliver some stuff for work. BTW, this is using Beam 2.31.
>>
>> Thank you for your time reading these walls of texts I'm sending and
>> advice!
>>
>> Cheers,
>> Cristian
>>
>> [1]
>> http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html
>>
>

Reply via email to