[
https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rong Rong updated FLINK-10019:
------------------------------
Description:
If explicitly return a CompositeType in {{udf.getResultType}}, will result in
some failures in chained operators.
For example: consider a simple UDF,
{code:scala}
object Func extends ScalarFunction {
def eval(row: Row): Row = {
row
}
override def getParameterTypes(signature: Array[Class[_]]):
Array[TypeInformation[_]] =
Array(Types.ROW(Types.INT))
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(Types.INT)
}
{code}
This should work perfectly since it's just a simple pass through, however
{code:scala}
@Test
def testRowType(): Unit = {
val data = List(
Row.of(Row.of(12.asInstanceOf[Integer]), "1")
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT),
Types.STRING))
val tEnv = TableEnvironment.getTableEnvironment(env)
val table = stream.toTable(tEnv, 'a, 'b)
tEnv.registerFunction("func", Func)
tEnv.registerTable("t", table)
// This works perfectly
val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
result1.addSink(new StreamITCase.StringSink[Row])
// This throws exception
val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM
t").toAppendStream[Row]
result2.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
{code}
Exception code:
{code:java}
java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
at
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
at
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
at
com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
at
org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
...
{code}
This is due to the fact that Calcite inferOperandTypes does not expect to infer
a struct RelDataType.
was:
If explicitly return a CompositeType in {{udf.getResultType}}, will result in
some failures in chained operators.
For example: consider a simple UDF,
{code:scala}
object Func extends ScalarFunction {
def eval(row: Row): Row = {
row
}
override def getParameterTypes(signature: Array[Class[_]]):
Array[TypeInformation[_]] =
Array(Types.ROW(Types.INT))
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(Types.INT)
}
{code}
This should work perfectly since it's just a simple pass through, however
{code:scala}
@Test
def testRowType(): Unit = {
val data = List(
Row.of(Row.of(12.asInstanceOf[Integer]), "1")
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT),
Types.STRING))
val tEnv = TableEnvironment.getTableEnvironment(env)
val table = stream.toTable(tEnv, 'a, 'b)
tEnv.registerFunction("func", Func20)
tEnv.registerTable("t", table)
// This works perfectly
val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
result1.addSink(new StreamITCase.StringSink[Row])
// This throws exception
val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM
t").toAppendStream[Row]
result2.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
{code}
Exception code:
{code:java}
java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
at
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
at
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
at
com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
at
org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
...
{code}
This is due to the fact that Calcite inferOperandTypes does not expect to infer
a struct RelDataType.
> Fix Composite getResultType of UDF cannot be chained with other operators
> -------------------------------------------------------------------------
>
> Key: FLINK-10019
> URL: https://issues.apache.org/jira/browse/FLINK-10019
> Project: Flink
> Issue Type: Sub-task
> Reporter: Rong Rong
> Assignee: Rong Rong
> Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in
> some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
> def eval(row: Row): Row = {
> row
> }
> override def getParameterTypes(signature: Array[Class[_]]):
> Array[TypeInformation[_]] =
> Array(Types.ROW(Types.INT))
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
> Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
> @Test
> def testRowType(): Unit = {
> val data = List(
> Row.of(Row.of(12.asInstanceOf[Integer]), "1")
> )
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT),
> Types.STRING))
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val table = stream.toTable(tEnv, 'a, 'b)
> tEnv.registerFunction("func", Func)
> tEnv.registerTable("t", table)
> // This works perfectly
> val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
> result1.addSink(new StreamITCase.StringSink[Row])
> // This throws exception
> val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM
> t").toAppendStream[Row]
> result2.addSink(new StreamITCase.StringSink[Row])
> env.execute()
> }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
> at
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
> at
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
> at
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
> at
> org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to
> infer a struct RelDataType.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)