Thanks for the reply, Here is the simple java program that re-produce the problem: 1. code for the application:
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.Arrays; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * If enable checkpoint, blink planner will failed */ env.enableCheckpointing(1000); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() // .useBlinkPlanner() // compile fail .useOldPlanner() // compile success .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "beer", 2) )); // Table table = tEnv.fromDataStream(orderA); tEnv.createTemporaryView("orderA", orderA); Table res = tEnv.sqlQuery("SELECT * FROM orderA"); DataStream<Tuple2<Boolean, Row>> ds = tEnv.toRetractStream(res, Row.class); ds.print(); env.execute(); } public static class Order { public long user; public String product; public int amount; public Order(long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } public long getUser() { return user; } public void setUser(long user) { this.user = user; } public String getProduct() { return product; } public void setProduct(String product) { this.product = product; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } } } 2. mvn clean package to a jar file 3. then we use the following code to produce a jobgraph: PackagedProgram program = PackagedProgram.newBuilder() .setJarFile(userJarPath) .setUserClassPaths(classpaths) .setEntryPointClassName(userMainClass) .setConfiguration(configuration) .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() && descriptor.getSavepointPath() != null && !descriptor.getSavepointPath().equals("")) ? SavepointRestoreSettings.forPath(descriptor.getSavepointPath(), descriptor.isAllowNonRestoredState()) : SavepointRestoreSettings.none()) .setArguments(userArgs) .build(); JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, 4, true); 4. If we use blink planner & enable checkpoint, the compile will failed. For the others, the compile success. Thanks, Bin Jark Wu <imj...@gmail.com> 于2020年6月17日周三 上午10:42写道: > Hi, > > Which Flink version are you using? Are you using SQL CLI? Could you share > your table/sql program? > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302 > > Best, > Jark > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <dubin...@gmail.com> wrote: > > > add the full stack trace here: > > > > > > Caused by: > > > > > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > > org.apache.flink.api.common.InvalidProgramException: Table program cannot > > be compiled. This is a bug. Please file an issue. > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > > at > > > > > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > > ... 14 more > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > > program cannot be compiled. This is a bug. Please file an issue. > > at > > > > > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > > at > > > > > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > > at > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > > ... 17 more > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column > > 46: Cannot determine simple type name "org" > > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) > > at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) > > at > > > > > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) > > at > > > > > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) > > at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) > > at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) > > at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) > > at > > > org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886) > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455) > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260) > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > > at > > > > > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > > at > > > > > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > > at > > > > > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > > at > > > > > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > > at > > > > > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > > ... 23 more > > > > 杜斌 <dubin...@gmail.com> 于2020年6月17日周三 上午10:29写道: > > > > > Hi, > > > Need help on this issue, here is what Flink reported when I enable the > > > checkpoint setting of the StreamExecutionEnvironment: > > > > > > /* 1 */ > > > /* 2 */ public class SourceConversion$1 extends > > > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator > > > /* 3 */ implements > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > > > /* 4 */ > > > /* 5 */ private final Object[] references; > > > /* 6 */ private transient > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter > > > converter$0; > > > /* 7 */ private final > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > outElement = > > > new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > > > /* 8 */ > > > /* 9 */ public SourceConversion$1( > > > /* 10 */ Object[] references, > > > /* 11 */ org.apache.flink.streaming.runtime.tasks.StreamTask > > > task, > > > /* 12 */ org.apache.flink.streaming.api.graph.StreamConfig > > > config, > > > /* 13 */ org.apache.flink.streaming.api.operators.Output > > > output) throws Exception { > > > /* 14 */ this.references = references; > > > /* 15 */ converter$0 = > > > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter) > > > references[0])); > > > /* 16 */ this.setup(task, config, output); > > > /* 17 */ } > > > /* 18 */ > > > /* 19 */ @Override > > > /* 20 */ public void open() throws Exception { > > > /* 21 */ super.open(); > > > /* 22 */ > > > /* 23 */ } > > > /* 24 */ > > > /* 25 */ @Override > > > /* 26 */ public void > > > > > > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > > element) throws Exception { > > > /* 27 */ org.apache.flink.table.dataformat.BaseRow in1 = > > > (org.apache.flink.table.dataformat.BaseRow) > > > (org.apache.flink.table.dataformat.BaseRow) > > > converter$0.toInternal((org.apache.flink.types.Row) > element.getValue()); > > > /* 28 */ > > > /* 29 */ > > > /* 30 */ > > > /* 31 */ output.collect(outElement.replace(in1)); > > > /* 32 */ } > > > /* 33 */ > > > /* 34 */ > > > /* 35 */ > > > /* 36 */ @Override > > > /* 37 */ public void close() throws Exception { > > > /* 38 */ super.close(); > > > /* 39 */ > > > /* 40 */ } > > > /* 41 */ > > > /* 42 */ > > > /* 43 */ } > > > /* 44 */ > > > > > > Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: > > > org.apache.flink.api.common.InvalidProgramException: Table program > cannot > > > be compiled. This is a bug. Please file an issue. > > > at > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > > > at > > > > > > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > > > at > > > > > > org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) > > > at > > > > > > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) > > > at > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) > > > at > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) > > > at > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) > > > at > > > > > > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) > > > at > > > > > > org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) > > > at > > > > > > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) > > > at > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) > > > at > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85) > > > > > > The janino will compile successfully when I remove the checkpoint > setting > > > of the env. > > > > > > Can anyone help on this? > > > > > > Thanks, > > > Bin > > > > > >