[
https://issues.apache.org/jira/browse/FLINK-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nico Kruber updated FLINK-16662:
--------------------------------
Priority: Blocker (was: Major)
> Blink Planner failed to generate JobGraph for POJO DataStream converting to
> Table (Cannot determine simple type name)
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-16662
> URL: https://issues.apache.org/jira/browse/FLINK-16662
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: chenxyz
> Assignee: LionelZ
> Priority: Blocker
>
> When using Blink Palnner to convert a POJO DataStream to a Table, Blink will
> generate and compile the SourceConversion$1 code. If the Jar task is
> submitted to Flink, since the UserCodeClassLoader is not used when generating
> the JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot
> load the POJO class in the Jar package, so the following error will be
> reported:
>
> {code:java}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column
> 174: Cannot determine simple type name "net"Caused by:
> org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot
> determine simple type name "net" at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> at
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at
> org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at
> org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at
> org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at
> org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
> at
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at
> org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 20 more
> // generate class
> /* 1 */
> /* 2 */ public class SourceConversion$1 extends
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> /* 3 */ implements
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */ private final Object[] references;
> /* 6 */ private transient
> org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter
> converter$0;
> /* 7 */ private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */ public SourceConversion$1(
> /* 10 */ Object[] references,
> /* 11 */ org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */ org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */ org.apache.flink.streaming.api.operators.Output output)
> throws Exception {
> /* 14 */ this.references = references;
> /* 15 */ converter$0 =
> (((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter)
> references[0]));
> /* 16 */ this.setup(task, config, output);
> /* 17 */ }
> /* 18 */
> /* 19 */ @Override
> /* 20 */ public void open() throws Exception {
> /* 21 */ super.open();
> /* 22 */
> /* 23 */ }
> /* 24 */
> /* 25 */ @Override
> /* 26 */ public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> /* 27 */ org.apache.flink.table.dataformat.BaseRow in1 =
> (org.apache.flink.table.dataformat.BaseRow)
> (org.apache.flink.table.dataformat.BaseRow)
> converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue());
> /* 28 */
> /* 29 */
> /* 30 */
> /* 31 */ output.collect(outElement.replace(in1));
> /* 32 */ }
> /* 33 */
> /* 34 */
> /* 35 */
> /* 36 */ @Override
> /* 37 */ public void close() throws Exception {
> /* 38 */ super.close();
> /* 39 */
> /* 40 */ }
> /* 41 */
> /* 42 */
> /* 43 */ }
> /* 44 */ {code}
> I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be
> used when generating JobGraph.
> The test code is as follows:
>
> {code:java}
> public class App {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSet =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> envSet);
> env.enableCheckpointing(2 * 60 * 1000);
> TableConfig config = tableEnv.getConfig();
> config.setIdleStateRetentionTime(Time.hours(24),
> Time.hours(25));
> DataStreamSource<Student> source = env.addSource(new
> SourceFunction<Student>() {
> @Override
> public void run(SourceContext<Student> ctx) throws Exception {
> ctx.collect(new Student(1, "Tom"));
> }
> @Override
> public void cancel() {
> }
> });
> tableEnv.createTemporaryView("student", source, "id, name");
> Table table = tableEnv.sqlQuery("select id, name from student");
> CsvTableSink sink = new CsvTableSink("/data/student", ",", 10,
> FileSystem.WriteMode.OVERWRITE);
> String[] fieldNames = {"id", "name"};
> TypeInformation[] fieldTypes = {Types.INT, Types.STRING};
> tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes,
> sink);
> table.insertInto("student_sink");
> env.execute("Test_Jar");
> }
> @Getter
> @Setter
> @NoArgsConstructor
> @AllArgsConstructor
> public static class Student {
> private Integer id;
> private String name;
> }
> }{code}
> To reproduce this bug, the following conditions must be met:
> 1. Convert POJO DataStream to Table
> 2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check
> whether Checkpoint is enabled
> 3. The program is packaged into a Jar and submitted to Flink, or invoke
> PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program
> directly
--
This message was sent by Atlassian Jira
(v8.3.4#803005)