[ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755467#comment-17755467
 ] 

yong yang commented on FLINK-29546:
-----------------------------------

i hava same error in flink1.15.0

 

CREATE TABLE t2
(
    `log` STRING,
    `uid` AS get_json_object(`log`,'$.data.uid'),
    `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'),
    `isProduct` AS get_json_object(`log`, '$.data.isProduct'),
    `ct` AS  get_json_object(`log`,'$.data.dialogDO.ctime',0),
    `ct1` AS TO_TIMESTAMP_LTZ(
        get_json_object(`log`, '$.data.dialogDO.ctime',0)
        ,0
        ),
    proc_time AS PROCTIME()
    , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND
) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'scan.startup.mode' = 'earliest-offset',
      'topic' = 'councilFeedbackRealtime',
      'properties.group.id' = 'aaa',
      'format' = 'raw'
      );

 

 
package com.tuya.tlink.udf.common

import com.alibaba.fastjson2.\{JSONPath, JSONReader}
import org.apache.flink.table.functions.ScalarFunction

import scala.util.Try
import org.apache.flink.configuration.\{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object GetJsonObject {
def main(args: Array[String]): Unit = {

val conf = new Configuration
conf.setInteger(RestOptions.PORT, 38080)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val tEnv = StreamTableEnvironment.create(env)

val table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2))
, DataTypes.FIELD("name", DataTypes.STRING())
),
row(2L, """\{"k1":"v111","k2":100}""")
)

tEnv.createTemporaryView("tb1", table)
tEnv.createFunction("get_json_object", classOf[GetJsonObject])

val tb = tEnv.executeSql(
"""
| select id,get_json_object(name,'$.k3',999) as v from tb1
|""".stripMargin)

println(tb.getTableSchema)
tb.print()

}

}

/**
* 适合获取一个json值
* 支持 多余转义字符 \ 的json
* 支持完全jsonpath eg: $.f1.f2
*/
class GetJsonObject extends ScalarFunction {


/**
* @param json 兼容 空字符串
* @param path
* @return
*/
def eval(json: String, path: String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse("")
}
}
jsobj
}

def eval(json: String, path: String,defaultValue:String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue)
}
}
jsobj
}

def eval(json: String, path: String, defaultValue: Long): Long = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue.toString)
}
}
jsobj.toLong
}

}

> UDF:Failed to compile split code, falling back to original code
> ---------------------------------------------------------------
>
>                 Key: FLINK-29546
>                 URL: https://issues.apache.org/jira/browse/FLINK-29546
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.15.0
>         Environment: my pom:
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-table-api-java-uber</artifactId>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-table-runtime</artifactId>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-table-planner-loader</artifactId>
> </dependency>
> jdk 1.8
>            Reporter: Hui Wang
>            Priority: Major
>         Attachments: image-2023-08-17-18-18-50-937.png
>
>
> 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile 
> split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148)
>     at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 15 common frames omitted
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>     ... 18 common frames omitted
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 
> 28: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573)
>     at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481)
>     at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476)
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928)
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476)
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469)
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927)
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
>     at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8271)
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6873)
>     at org.codehaus.janino.UnitCompiler.access$14400(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6499)
>     at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6494)
>     at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4310)
>     at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494)
>     at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490)
>     at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490)
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469)
>     at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116)
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6855)
>     at org.codehaus.janino.UnitCompiler.access$14200(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6497)
>     at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6494)
>     at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4224)
>     at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494)
>     at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490)
>     at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490)
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469)
>     at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116)
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9026)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5062)
>     at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
>     at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>     at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
>     at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
>     at 
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>     at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>     at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>     at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>     at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>     at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>     at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
>     ... 24 common frames omitted



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to