The exception is thrown when the StreamGraph is translated to a JobGraph.
The translation logic has a switch. If checkpointing is enabled, the Java
code generated by the optimizer is directly compiled in the client (in
contrast to compiling it on the TaskManagers when the operators are
started).
The compiler needs access to the UDF classes but the classloader that's
being used doesn't know about the UDF classes.

The classloader that's used for the compilation is generated by
PackagedProgram.
You can configure the PackagedProgram with the right user code JAR URLs
which contain the UDF classes.
Alternatively, you can try to inject another classloader into
PackagedProgram using Reflection (but that's a rather hacky approach).

Hope this helps.

Cheers, Fabian

Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <imj...@gmail.com>:

> Why compile JobGraph yourself? This is really an internal API and may cause
> problems.
> Could you try to use `flink run` command [1] to submit your user jar
> instead?
>
> Btw, what's your Flink version? If you are using Flink 1.10.0, could you
> try to use 1.10.1?
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 12:41, 杜斌 <dubin...@gmail.com> wrote:
>
> > Thanks for the reply,
> > Here is the simple java program that re-produce the problem:
> > 1. code for the application:
> >
> > import org.apache.flink.api.java.tuple.Tuple2;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> >
> > import java.util.Arrays;
> >
> > public class Test {
> >     public static void main(String[] args) throws Exception {
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         /**
> >          * If enable checkpoint, blink planner will failed
> >          */
> >         env.enableCheckpointing(1000);
> >
> >         EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > //                .useBlinkPlanner() // compile fail
> >                 .useOldPlanner() // compile success
> >                 .inStreamingMode()
> >                 .build();
> >         StreamTableEnvironment tEnv =
> > StreamTableEnvironment.create(env, envSettings);
> >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> >                 new Order(1L, "beer", 3),
> >                 new Order(1L, "diaper", 4),
> >                 new Order(3L, "beer", 2)
> >         ));
> >
> > //        Table table = tEnv.fromDataStream(orderA);
> >         tEnv.createTemporaryView("orderA", orderA);
> >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> >         DataStream<Tuple2<Boolean, Row>> ds =
> > tEnv.toRetractStream(res, Row.class);
> >         ds.print();
> >         env.execute();
> >
> >     }
> >
> >     public static class Order {
> >         public long user;
> >         public String product;
> >         public int amount;
> >
> >         public Order(long user, String product, int amount) {
> >             this.user = user;
> >             this.product = product;
> >             this.amount = amount;
> >         }
> >
> >         public long getUser() {
> >             return user;
> >         }
> >
> >         public void setUser(long user) {
> >             this.user = user;
> >         }
> >
> >         public String getProduct() {
> >             return product;
> >         }
> >
> >         public void setProduct(String product) {
> >             this.product = product;
> >         }
> >
> >         public int getAmount() {
> >             return amount;
> >         }
> >
> >         public void setAmount(int amount) {
> >             this.amount = amount;
> >         }
> >     }
> > }
> >
> > 2. mvn clean package to a jar file
> > 3. then we use the following code to produce a jobgraph:
> >
> > PackagedProgram program =
> >         PackagedProgram.newBuilder()
> >                 .setJarFile(userJarPath)
> >                 .setUserClassPaths(classpaths)
> >                 .setEntryPointClassName(userMainClass)
> >                 .setConfiguration(configuration)
> >
> > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > descriptor.getSavepointPath() != null &&
> > !descriptor.getSavepointPath().equals("")) ?
> >
> > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > descriptor.isAllowNonRestoredState()) :
> > SavepointRestoreSettings.none())
> >                 .setArguments(userArgs)
> >                 .build();
> >
> >
> > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > configuration, 4, true);
> >
> > 4. If we use blink planner & enable checkpoint, the compile will failed.
> > For the others, the compile success.
> >
> > Thanks,
> > Bin
> >
> > Jark Wu <imj...@gmail.com> 于2020年6月17日周三 上午10:42写道:
> >
> > > Hi,
> > >
> > > Which Flink version are you using? Are you using SQL CLI? Could you
> share
> > > your table/sql program?
> > > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <dubin...@gmail.com> wrote:
> > >
> > > > add the full stack trace here:
> > > >
> > > >
> > > > Caused by:
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > cannot
> > > > be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > ... 14 more
> > > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > ... 17 more
> > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> > Column
> > > > 46: Cannot determine simple type name "org"
> > > > at
> > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > at
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > at
> org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > at
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > ... 23 more
> > > >
> > > > 杜斌 <dubin...@gmail.com> 于2020年6月17日周三 上午10:29写道:
> > > >
> > > > > Hi,
> > > > > Need help on this issue, here is what Flink reported when I enable
> > the
> > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > >
> > > > > /* 1 */
> > > > > /* 2 */      public class SourceConversion$1 extends
> > > > >
> > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > /* 3 */          implements
> > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > /* 4 */
> > > > > /* 5 */        private final Object[] references;
> > > > > /* 6 */        private transient
> > > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > converter$0;
> > > > > /* 7 */        private final
> > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > outElement =
> > > > > new
> > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > /* 8 */
> > > > > /* 9 */        public SourceConversion$1(
> > > > > /* 10 */            Object[] references,
> > > > > /* 11 */
> > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > task,
> > > > > /* 12 */
> org.apache.flink.streaming.api.graph.StreamConfig
> > > > > config,
> > > > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > > > output) throws Exception {
> > > > > /* 14 */          this.references = references;
> > > > > /* 15 */          converter$0 =
> > > > >
> > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > references[0]));
> > > > > /* 16 */          this.setup(task, config, output);
> > > > > /* 17 */        }
> > > > > /* 18 */
> > > > > /* 19 */        @Override
> > > > > /* 20 */        public void open() throws Exception {
> > > > > /* 21 */          super.open();
> > > > > /* 22 */
> > > > > /* 23 */        }
> > > > > /* 24 */
> > > > > /* 25 */        @Override
> > > > > /* 26 */        public void
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > element) throws Exception {
> > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > element.getValue());
> > > > > /* 28 */
> > > > > /* 29 */
> > > > > /* 30 */
> > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > /* 32 */        }
> > > > > /* 33 */
> > > > > /* 34 */
> > > > > /* 35 */
> > > > > /* 36 */        @Override
> > > > > /* 37 */        public void close() throws Exception {
> > > > > /* 38 */           super.close();
> > > > > /* 39 */
> > > > > /* 40 */        }
> > > > > /* 41 */
> > > > > /* 42 */
> > > > > /* 43 */      }
> > > > > /* 44 */
> > > > >
> > > > > Exception in thread "main"
> > org.apache.flink.util.FlinkRuntimeException:
> > > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > > cannot
> > > > > be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > >
> > > > > The janino will compile successfully when I remove the checkpoint
> > > setting
> > > > > of the env.
> > > > >
> > > > > Can anyone help on this?
> > > > >
> > > > > Thanks,
> > > > > Bin
> > > > >
> > > >
> > >
> >
>

Reply via email to