Hi,
Need help on this issue, here is what Flink reported when I enable the
checkpoint setting of the StreamExecutionEnvironment:

/* 1 */
/* 2 */      public class SourceConversion$1 extends
org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
/* 3 */          implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */        private final Object[] references;
/* 6 */        private transient
org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
converter$0;
/* 7 */        private final
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */        public SourceConversion$1(
/* 10 */            Object[] references,
/* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask
task,
/* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
config,
/* 13 */            org.apache.flink.streaming.api.operators.Output output)
throws Exception {
/* 14 */          this.references = references;
/* 15 */          converter$0 =
(((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
references[0]));
/* 16 */          this.setup(task, config, output);
/* 17 */        }
/* 18 */
/* 19 */        @Override
/* 20 */        public void open() throws Exception {
/* 21 */          super.open();
/* 22 */
/* 23 */        }
/* 24 */
/* 25 */        @Override
/* 26 */        public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element) throws Exception {
/* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
(org.apache.flink.table.dataformat.BaseRow)
(org.apache.flink.table.dataformat.BaseRow)
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());
/* 28 */
/* 29 */
/* 30 */
/* 31 */          output.collect(outElement.replace(in1));
/* 32 */        }
/* 33 */
/* 34 */
/* 35 */
/* 36 */        @Override
/* 37 */        public void close() throws Exception {
/* 38 */           super.close();
/* 39 */
/* 40 */        }
/* 41 */
/* 42 */
/* 43 */      }
/* 44 */

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)

The janino will compile successfully when I remove the checkpoint setting
of the env.

Can anyone help on this?

Thanks,
Bin

Reply via email to