[ 
https://issues.apache.org/jira/browse/FLINK-25727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482959#comment-17482959
 ] 

luoyuxia edited comment on FLINK-25727 at 1/27/22, 8:23 AM:
------------------------------------------------------------

After do some debuging, I found the main reason is that the  hive dialect 
`lateral table` is not supported in hive planner, so it will fail when try to 
parse  lateral table, and then fall back to Flink dialect. For Flink dialect, 
the string literal will be treated as char type but the hive function 
json_tuple require string type.
To fix it, I think the best way is to support the hive dialect lateral table`  
in hive planner. Although I'm not sure why it'not supported  in current code, 
I'll try to do that.


was (Author: luoyuxia):
After do some debuging, I found the main reason is that the  hive dialect
{code:java}
 lateral table
{code}
 is not supported in hive planner, so it will fail when try to parse  lateral 
table, and then fall back to Flink dialect. For Flink dialect, the string 
literal will be treated as char type but the hive function json_tuple require 
string type.
To fix it, I think the best way is to support the hive dialect lateral table`  
in hive planner. Although I'm not sure why it'not supported  in current code, 
I'll try to do that.

> Flink SQL convert constant string to char type which cause hive udtf 
> json_tuple not work
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-25727
>                 URL: https://issues.apache.org/jira/browse/FLINK-25727
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.2
>         Environment: Flink 1.14.2
> Hive 2.3.9
>            Reporter: syntomic
>            Priority: Not a Priority
>              Labels: easyfix
>         Attachments: image-2022-01-20-17-44-58-478.png
>
>
> Flink SQL is:
> {code:java}
> SELECT
>     a.`log`,
>     b.`role_id`
> FROM
>     tmp_kafka a, lateral table(json_tuple(`log`, 'role_id')) AS b(`role_id`); 
> {code}
> Exception is:
> {code:java}
> org.apache.flink.table.api.ValidationException: SQL validation failed. 
> java.lang.reflect.InvocationTargetException
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
>       at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716)
>       at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:106)
>       at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:86)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
>       at 
> org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
>       at 
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
>       at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
>       at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
>       at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>       at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
>       at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: 
> java.lang.reflect.InvocationTargetException
>       at 
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:83)
>       at 
> org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction.getRowType(HiveTableSqlFunction.java:116)
>       at 
> org.apache.flink.table.planner.functions.utils.TableSqlFunction$$anon$1.inferReturnType(TableSqlFunction.scala:89)
>       at 
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:69)
>       at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
>       at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
>       at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>       at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>       at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
>       ... 20 more
> Caused by: java.lang.reflect.InvocationTargetException
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:76)
>       ... 40 more
> Caused by: org.apache.flink.table.functions.hive.FlinkHiveUDFException: 
> org.apache.hadoop.hive.ql.exec.UDFArgumentException: json_tuple()'s arguments 
> have to be string type
>       at 
> org.apache.flink.table.functions.hive.HiveGenericUDTF.getHiveResultType(HiveGenericUDTF.java:146)
>       ... 45 more
> Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: 
> json_tuple()'s arguments have to be string type
>       at 
> org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple.initialize(GenericUDTFJSONTuple.java:118)
>       at 
> org.apache.flink.table.functions.hive.HiveGenericUDTF.getHiveResultType(HiveGenericUDTF.java:144)
>       ... 45 more
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to