[ 
https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812134#comment-17812134
 ] 

xuyang commented on FLINK-34016:
--------------------------------

Hi, [~wczhu] . +1 Could not reproduce it on my local env.

Can you try this following steps?

Add the following options when you start the session:

 
{code:java}
env.java.opts.taskmanager: "-Dorg.codehaus.janino.source_debugging.enable=true 
-Dorg.codehaus.janino.source_debugging.dir=/flink/log/"{code}
Then you could see the generated files by codegen in the file path, and then 
attach the java file about WatermarkGenerator to this jira.

 

> Janino compile failed when watermark with column by udf
> -------------------------------------------------------
>
>                 Key: FLINK-34016
>                 URL: https://issues.apache.org/jira/browse/FLINK-34016
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.15.0, 1.18.0
>            Reporter: JJJJude
>            Priority: Major
>         Attachments: image-2024-01-25-11-53-06-158.png, 
> image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, 
> image-2024-01-25-12-57-34-632.png
>
>
> After submit the following flink sql by sql-client.sh will throw an exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'WatermarkGenerator$0'
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     ... 16 more
> Caused by: 
> org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 18 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
>     ... 21 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 123: Line 29, Column 123: Cannot determine simple type name "org"
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:7007)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6886)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6899)
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6857)
>     at org.codehaus.janino.UnitCompiler.access$14800(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$24.visitReferenceType(UnitCompiler.java:6755)
>     at 
> org.codehaus.janino.UnitCompiler$24.visitReferenceType(UnitCompiler.java:6752)
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:4289)
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6752)
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7532)
>     at org.codehaus.janino.UnitCompiler.access$17300(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$25.visitNewClassInstance(UnitCompiler.java:6799)
>     at 
> org.codehaus.janino.UnitCompiler$25.visitNewClassInstance(UnitCompiler.java:6773)
>     at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:5587)
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6773)
>     at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9621)
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9506)
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9422)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5263)
>     at org.codehaus.janino.UnitCompiler.access$9300(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4766)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4742)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5470)
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4742)
>     at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5885)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:4121)
>     at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:4096)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:4071)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5470)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:4071)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2524)
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1581)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1576)
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:3209)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1576)
>     at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1662)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3665)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3330)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1448)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1421)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:830)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:443)
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:423)
>     at 
> org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:419)
>     at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1688)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:419)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>     at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:237)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:364)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:362)
>     at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:362)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:273)
>     at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:526)
>     at org.codehaus.janino.SimpleCompiler.cook2(SimpleCompiler.java:250)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:229)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:219)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:82)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
>     ... 27 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 123: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
>     at 
> org.codehaus.janino.UnitCompiler.getRawReferenceType(UnitCompiler.java:7175)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:7005)
>     ... 94 more
>  {code}
>  
> flink sql: 
> {code:java}
> CREATE TABLE default_catalog.default_database.KafkaTable (
>   `block` STRING,
>   `de` STRING,
>   `http_path` STRING,
>   `logType` STRING,
>   `mod` STRING,
>   `pb_city_id` STRING,
>   `pb_nation_id` STRING,
>   `timestamp` BIGINT,
>   `v_lineNum` STRING,
>   `v_timestamp` BIGINT,
>    ts AS COALESCE(TO_TIMESTAMP(
>         FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')
>     ),CURRENT_TIMESTAMP),
>     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'kafka_topic_01',
>   'properties.bootstrap.servers' = '*****',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
> CREATE TABLE default_catalog.default_database.table_print(
>   `block` STRING,
>   `de` STRING,
>   `http_path` STRING,
>   `logType` STRING,
>   `mod` STRING,
>   `pb_city_id` STRING,
>   `pb_nation_id` STRING,
>   `timestamp` BIGINT,
>   `v_lineNum` STRING,
>   `v_timestamp` BIGINT,
>    ts TIMESTAMP
> ) WITH(
>     'connector' = 'print',
>     'print-identifier' = '===== PrintResult: ====='
> );
> insert into default_catalog.default_database.table_print select * from 
> default_catalog.default_database.KafkaTable; {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to