[ 
https://issues.apache.org/jira/browse/FLINK-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105982#comment-17105982
 ] 

xiemeilong edited comment on FLINK-16662 at 5/14/20, 5:08 AM:
--------------------------------------------------------------

[~zhanglibing1...@126.com] After upgrade to 1.10.1, a similar issue occurred.
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Given parameters of 
function 'generateDecoder' do not match any signature. 
Actual: (com.yunmo.iot.schema.RecordFormat, com.yunmo.iot.schema.Schema) 
Expected: (com.yunmo.iot.schema.RecordFormat, com.yunmo.iot.schema.Schema)
{code}


was (Author: xiemeilong):
After upgrade to 1.10.1, a similar issue occurred.
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Given parameters of 
function 'generateDecoder' do not match any signature. 
Actual: (com.yunmo.iot.schema.RecordFormat, com.yunmo.iot.schema.Schema) 
Expected: (com.yunmo.iot.schema.RecordFormat, com.yunmo.iot.schema.Schema)
{code}

> Blink Planner failed to generate JobGraph for POJO DataStream converting to 
> Table (Cannot determine simple type name)
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16662
>                 URL: https://issues.apache.org/jira/browse/FLINK-16662
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: chenxyz
>            Assignee: LionelZ
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.1, 1.11.0
>
>
> When using Blink Palnner to convert a POJO DataStream to a Table, Blink will 
> generate and compile the SourceConversion$1 code. If the Jar task is 
> submitted to Flink, since the UserCodeClassLoader is not used when generating 
> the JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot 
> load the POJO class in the Jar package, so the following error will be 
> reported:
>  
> {code:java}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column 
> 174: Cannot determine simple type name "net"Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot 
> determine simple type name "net" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>  at 
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at 
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at 
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at 
> org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at 
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at 
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150)
>  at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at 
> org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at 
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>  at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>  at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at 
> org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
>  at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
>  at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
>  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at 
> org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>  at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at 
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at 
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>  at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>  at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) 
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at 
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>  at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>  ... 20 more
> // generate class
> /* 1 */
> /* 2 */      public class SourceConversion$1 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> /* 3 */          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient 
> org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter 
> converter$0;
> /* 7 */        private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */        public SourceConversion$1(
> /* 10 */            Object[] references,
> /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */            org.apache.flink.streaming.api.operators.Output output) 
> throws Exception {
> /* 14 */          this.references = references;
> /* 15 */          converter$0 = 
> (((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter) 
> references[0]));
> /* 16 */          this.setup(task, config, output);
> /* 17 */        }
> /* 18 */
> /* 19 */        @Override
> /* 20 */        public void open() throws Exception {
> /* 21 */          super.open();
> /* 22 */          
> /* 23 */        }
> /* 24 */
> /* 25 */        @Override
> /* 26 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) 
> (org.apache.flink.table.dataformat.BaseRow) 
> converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue());
> /* 28 */          
> /* 29 */          
> /* 30 */          
> /* 31 */          output.collect(outElement.replace(in1));
> /* 32 */        }
> /* 33 */
> /* 34 */        
> /* 35 */
> /* 36 */        @Override
> /* 37 */        public void close() throws Exception {
> /* 38 */           super.close();
> /* 39 */          
> /* 40 */        }
> /* 41 */
> /* 42 */        
> /* 43 */      }
> /* 44 */    {code}
> I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be 
> used when generating JobGraph.
> The test code is as follows:
>  
> {code:java}
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings envSet = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> envSet);
>         env.enableCheckpointing(2 * 60 * 1000);
>         TableConfig config = tableEnv.getConfig();
>         config.setIdleStateRetentionTime(Time.hours(24),
>                 Time.hours(25));
>         DataStreamSource<Student> source = env.addSource(new 
> SourceFunction<Student>() {
>             @Override
>             public void run(SourceContext<Student> ctx) throws Exception {
>                 ctx.collect(new Student(1, "Tom"));
>             }
>             @Override
>             public void cancel() {
>             }
>         });
>         tableEnv.createTemporaryView("student", source, "id, name");
>         Table table = tableEnv.sqlQuery("select id, name from student");
>         CsvTableSink sink = new CsvTableSink("/data/student", ",", 10, 
> FileSystem.WriteMode.OVERWRITE);
>         String[] fieldNames = {"id", "name"};
>         TypeInformation[] fieldTypes = {Types.INT, Types.STRING};
>         tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes, 
> sink);
>         table.insertInto("student_sink");
>         env.execute("Test_Jar");
>     }
>     @Getter
>     @Setter
>     @NoArgsConstructor
>     @AllArgsConstructor
>     public static class Student {
>         private Integer id;
>         private String name;
>     }
> }{code}
> To reproduce this bug, the following conditions must be met:
> 1. Convert POJO DataStream to Table
> 2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check 
> whether Checkpoint is enabled
> 3. The program is packaged into a Jar and submitted to Flink, or invoke 
> PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program 
> directly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to