[
https://issues.apache.org/jira/browse/FLINK-25727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479776#comment-17479776
]
luoyuxia edited comment on FLINK-25727 at 1/27/22, 10:53 AM:
-------------------------------------------------------------
It's a issue about using hive function in flink sql. It can be reproduced by
the following code in HiveDialectITCase
{code:java}
tableEnv.executeSql("create table foo (x int, y int)");
tableEnv.executeSql(
"select foo.x, b.role_id from foo, lateral
table(json_tuple('{\"a\":\"0\",\"b\":\"1\"}', 'role')) AS b(role_id)");
{code}
The reason is that the parameters of json_tuple will be considered as char in
Flink SQl instead of string in the implementation. I'll try to fix it to make
such sql can work in Flink.
Currently, can use
{code:java}
json_tuple(repeat('{\"a\":\"0\",\"b\":\"1\"}',1)," " repeat('a', 1)))
{code}
as a work around.
was (Author: luoyuxia):
It's a issue about using hive function in flink sql. It can be reproduced by
the following code in HiveDialectITCase
{code:java}
tableEnv.executeSql("create table foo (x int, y int)");
tableEnv.executeSql(
"select foo.x, b.role_id from foo, lateral
table(json_tuple('{\"a\":\"0\",\"b\":\"1\"}', 'role')) AS b(role_id)");
{code}
The reason is that the parameters of json_tuple will be considered as char
instead of string in the implementation. I'll try to fix it to make such sql
can work in Flink.
Currently, can use
{code:java}
json_tuple(repeat('{\"a\":\"0\",\"b\":\"1\"}',1)," " repeat('a', 1)))
{code}
as a work around.
> Flink SQL convert constant string to char type which cause hive udtf
> json_tuple not work
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-25727
> URL: https://issues.apache.org/jira/browse/FLINK-25727
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.14.2
> Environment: Flink 1.14.2
> Hive 2.3.9
> Reporter: syntomic
> Priority: Not a Priority
> Labels: features
> Attachments: image-2022-01-20-17-44-58-478.png
>
>
> Flink SQL(use default dialect) is:
> {code:java}
> SELECT
> a.`log`,
> b.`role_id`
> FROM
> tmp_kafka a, lateral table(json_tuple(`log`, 'role_id')) AS b(`role_id`);
> {code}
> Exception is:
> {code:java}
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716)
> at
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:106)
> at
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:86)
> at
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callSelect(FlinkSqlInterpreter.java:494)
> at
> org.apache.zeppelin.flink.FlinkSqlInterpreter.callCommand(FlinkSqlInterpreter.java:257)
> at
> org.apache.zeppelin.flink.FlinkSqlInterpreter.runSqlList(FlinkSqlInterpreter.java:151)
> at
> org.apache.zeppelin.flink.FlinkSqlInterpreter.internalInterpret(FlinkSqlInterpreter.java:109)
> at
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
> at
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
> at
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
> at
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:83)
> at
> org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction.getRowType(HiveTableSqlFunction.java:116)
> at
> org.apache.flink.table.planner.functions.utils.TableSqlFunction$$anon$1.inferReturnType(TableSqlFunction.scala:89)
> at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:69)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
> ... 20 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:76)
> ... 40 more
> Caused by: org.apache.flink.table.functions.hive.FlinkHiveUDFException:
> org.apache.hadoop.hive.ql.exec.UDFArgumentException: json_tuple()'s arguments
> have to be string type
> at
> org.apache.flink.table.functions.hive.HiveGenericUDTF.getHiveResultType(HiveGenericUDTF.java:146)
> ... 45 more
> Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException:
> json_tuple()'s arguments have to be string type
> at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple.initialize(GenericUDTFJSONTuple.java:118)
> at
> org.apache.flink.table.functions.hive.HiveGenericUDTF.getHiveResultType(HiveGenericUDTF.java:144)
> ... 45 more
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)