Why compile JobGraph yourself? This is really an internal API and may cause
problems.
Could you try to use `flink run` command [1] to submit your user jar
instead?

Btw, what's your Flink version? If you are using Flink 1.10.0, could you
try to use 1.10.1?

Best,
Jark

On Wed, 17 Jun 2020 at 12:41, 杜斌 <dubin...@gmail.com> wrote:

> Thanks for the reply,
> Here is the simple java program that re-produce the problem:
> 1. code for the application:
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
>
> import java.util.Arrays;
>
> public class Test {
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         /**
>          * If enable checkpoint, blink planner will failed
>          */
>         env.enableCheckpointing(1000);
>
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> //                .useBlinkPlanner() // compile fail
>                 .useOldPlanner() // compile success
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, envSettings);
>         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
>                 new Order(1L, "beer", 3),
>                 new Order(1L, "diaper", 4),
>                 new Order(3L, "beer", 2)
>         ));
>
> //        Table table = tEnv.fromDataStream(orderA);
>         tEnv.createTemporaryView("orderA", orderA);
>         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
>         DataStream<Tuple2<Boolean, Row>> ds =
> tEnv.toRetractStream(res, Row.class);
>         ds.print();
>         env.execute();
>
>     }
>
>     public static class Order {
>         public long user;
>         public String product;
>         public int amount;
>
>         public Order(long user, String product, int amount) {
>             this.user = user;
>             this.product = product;
>             this.amount = amount;
>         }
>
>         public long getUser() {
>             return user;
>         }
>
>         public void setUser(long user) {
>             this.user = user;
>         }
>
>         public String getProduct() {
>             return product;
>         }
>
>         public void setProduct(String product) {
>             this.product = product;
>         }
>
>         public int getAmount() {
>             return amount;
>         }
>
>         public void setAmount(int amount) {
>             this.amount = amount;
>         }
>     }
> }
>
> 2. mvn clean package to a jar file
> 3. then we use the following code to produce a jobgraph:
>
> PackagedProgram program =
>         PackagedProgram.newBuilder()
>                 .setJarFile(userJarPath)
>                 .setUserClassPaths(classpaths)
>                 .setEntryPointClassName(userMainClass)
>                 .setConfiguration(configuration)
>
> .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> descriptor.getSavepointPath() != null &&
> !descriptor.getSavepointPath().equals("")) ?
>
> SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> descriptor.isAllowNonRestoredState()) :
> SavepointRestoreSettings.none())
>                 .setArguments(userArgs)
>                 .build();
>
>
> JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> configuration, 4, true);
>
> 4. If we use blink planner & enable checkpoint, the compile will failed.
> For the others, the compile success.
>
> Thanks,
> Bin
>
> Jark Wu <imj...@gmail.com> 于2020年6月17日周三 上午10:42写道:
>
> > Hi,
> >
> > Which Flink version are you using? Are you using SQL CLI? Could you share
> > your table/sql program?
> > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> >
> > Best,
> > Jark
> >
> > On Wed, 17 Jun 2020 at 10:31, 杜斌 <dubin...@gmail.com> wrote:
> >
> > > add the full stack trace here:
> > >
> > >
> > > Caused by:
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > org.apache.flink.api.common.InvalidProgramException: Table program
> cannot
> > > be compiled. This is a bug. Please file an issue.
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > ... 14 more
> > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > > program cannot be compiled. This is a bug. Please file an issue.
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > ... 17 more
> > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> Column
> > > 46: Cannot determine simple type name "org"
> > > at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > at
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > at
> > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > at
> > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > ... 23 more
> > >
> > > 杜斌 <dubin...@gmail.com> 于2020年6月17日周三 上午10:29写道:
> > >
> > > > Hi,
> > > > Need help on this issue, here is what Flink reported when I enable
> the
> > > > checkpoint setting of the StreamExecutionEnvironment:
> > > >
> > > > /* 1 */
> > > > /* 2 */      public class SourceConversion$1 extends
> > > >
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > /* 3 */          implements
> > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > /* 4 */
> > > > /* 5 */        private final Object[] references;
> > > > /* 6 */        private transient
> > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > converter$0;
> > > > /* 7 */        private final
> > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > outElement =
> > > > new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > /* 8 */
> > > > /* 9 */        public SourceConversion$1(
> > > > /* 10 */            Object[] references,
> > > > /* 11 */
> org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > task,
> > > > /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
> > > > config,
> > > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > > output) throws Exception {
> > > > /* 14 */          this.references = references;
> > > > /* 15 */          converter$0 =
> > > >
> (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > references[0]));
> > > > /* 16 */          this.setup(task, config, output);
> > > > /* 17 */        }
> > > > /* 18 */
> > > > /* 19 */        @Override
> > > > /* 20 */        public void open() throws Exception {
> > > > /* 21 */          super.open();
> > > > /* 22 */
> > > > /* 23 */        }
> > > > /* 24 */
> > > > /* 25 */        @Override
> > > > /* 26 */        public void
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > element) throws Exception {
> > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > converter$0.toInternal((org.apache.flink.types.Row)
> > element.getValue());
> > > > /* 28 */
> > > > /* 29 */
> > > > /* 30 */
> > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > /* 32 */        }
> > > > /* 33 */
> > > > /* 34 */
> > > > /* 35 */
> > > > /* 36 */        @Override
> > > > /* 37 */        public void close() throws Exception {
> > > > /* 38 */           super.close();
> > > > /* 39 */
> > > > /* 40 */        }
> > > > /* 41 */
> > > > /* 42 */
> > > > /* 43 */      }
> > > > /* 44 */
> > > >
> > > > Exception in thread "main"
> org.apache.flink.util.FlinkRuntimeException:
> > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > cannot
> > > > be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > >
> > > > The janino will compile successfully when I remove the checkpoint
> > setting
> > > > of the env.
> > > >
> > > > Can anyone help on this?
> > > >
> > > > Thanks,
> > > > Bin
> > > >
> > >
> >
>

Reply via email to