Hi Reuven, Thanks for the quick reply.
Could you elaborate why Beam needs to create the Builder dynamically through reflection (basically using reflection to create an instance of a package private class)? As far as AutoValue goes, it looks like an anti-pattern to try to create an instance of the generated builder by calling the AutoValue generated class (AutoValue_App_FooAutoValue in this case). I think that normally, the only place that can call the auto generated builder constructor is from the user code abstract class (FooAutoValue) through: public static Builder builder() { return new AutoValue_App_FooAutoValue.Builder(); } In fact, this method is directly called when using the @SchemaCreate method, regardless if the create method is called through reflection or not. I guess what I'm asking is, could beam not call the FooAutoValue.builder() dynamically if directly is not possible? On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax <re...@google.com> wrote: > Beam needs to create these elements dynamically. when decoding records, so > it can't easily call the builder directly. > > My first guess is that there's a classloader issue here. Flink does some > fancy classloader munging, and that might be breaking an assumption in this > code. Passing in the correct classloader should hopefully fix this. > > Reuven > > > On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu <zei...@gmail.com> > wrote: > >> Hi everyone, >> >> Not sure if anyone is using Beam with the Flink Runner and AutoValue >> builders. For me, it doesn't work. I have some questions and a workaround >> for anyone in the same boat. >> >> Beam 2.31, Flink 1.13, AutoValue 1.8.2 >> >> Here's the code: >> >> package org.whatever.testing; >> >> import com.google.auto.value.AutoValue; >> import org.apache.beam.sdk.Pipeline; >> import org.apache.beam.sdk.options.PipelineOptionsFactory; >> import org.apache.beam.sdk.schemas.AutoValueSchema; >> import org.apache.beam.sdk.schemas.annotations.DefaultSchema; >> import org.apache.beam.sdk.schemas.annotations.SchemaCreate; >> import org.apache.beam.sdk.schemas.transforms.Convert; >> import org.apache.beam.sdk.transforms.Create; >> import org.apache.beam.sdk.transforms.MapElements; >> import org.apache.beam.sdk.values.TypeDescriptor; >> >> import java.util.Arrays; >> >> public class App { >> >> public static void main(String[] args) { >> var options = >> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >> >> var p = Pipeline.create(options); >> p >> >> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build()))) >> .apply(Convert.to(FooAutoValue.class)) >> >> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> { >> System.out.println(i.toString()); >> return i; >> })) >> ; >> p.run().waitUntilFinish(); >> } >> @AutoValue >> @DefaultSchema(AutoValueSchema.class) >> public static abstract class FooAutoValue { >> public abstract String getDummyProp(); >> >> // @SchemaCreate >> // public static FooAutoValue create(String dummyProp) { >> // return builder() >> // .setDummyProp(dummyProp) >> // .build(); >> // } >> >> public static Builder builder() { >> return new AutoValue_App_FooAutoValue.Builder(); >> } >> >> @AutoValue.Builder >> public abstract static class Builder { >> public abstract Builder setDummyProp(String newDummyProp); >> >> public abstract FooAutoValue build(); >> } >> } >> } >> >> Note that it doesn't matter if FooAutoValue is an inner class or in its >> own file as a top level non static class. For simplicity here I'm >> converting the objects to the same class, in prod code the input is of >> another type with equivalent schema. >> >> And the stack trace: >> >> Caused by: java.lang.IllegalAccessError: failed to access class >> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class >> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 >> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed >> module of loader 'app'; >> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in >> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader >> @26f4afda) >> at >> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown >> Source) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66) >> at >> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129) >> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) >> at >> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95) >> at >> org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518) >> at >> org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500) >> at >> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252) >> at >> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80) >> at >> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> at java.base/java.lang.Thread.run(Thread.java:829) >> >> >> Workaround: >> - Uncomment the schemaCreate method >> - compile the code with "-parameters", if using maven: >> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-compiler-plugin</artifactId> >> <version>3.8.1</version> >> <configuration> >> <compilerArgs> >> <arg> >> -parameters >> </arg> >> </compilerArgs> >> .... >> >> >> My questions: >> 1. Why is Beam trying to get the generated AutoValue Builder through >> reflection using AutoValueUtils.getAutoValueGeneratedName (builds >> "AutoValue_App_FooAutoValue$Builder") instead of calling >> FooAutoValue.builder() directly without reflection? >> 2. With flink, given the fancy classloader work Flink does [1], in the >> AutoValueUtils.createBuilderCreator there is a call to >> ReflectHelpers.findClassLoader() which gets the thread classLoader, in this >> case it's a ChildFirstClassLoader. However the builder is registered as >> package private in the app classloader. Is there a reason why Beam >> registers the generated coder on the thread classLoader? >> >> As per usual, if this is deemed a valid bug, please let me know and I can >> eventually make a PR fixing it. >> >> Thank you, >> Cristian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/ >> >