[
https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeff Zhang updated FLINK-15566:
-------------------------------
Description:
I don't know why flink would do that, but this cause my user defined function
behavior incorrectly if I and pojo in my udf and override getResultType
[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
Here's the udf I define.
{code:java}
%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._
class Person(val age:Int, val job: String, val marital: String, val education:
String, val default: String, val balance: String, val housing: String, val
loan: String, val contact: String, val day: String, val month: String, val
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val
poutcome: String, val y: String)
class ParseFunction extends TableFunction[Person] {
def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)),
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)),
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)),
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new
Integer(tokens(11).toInt), new Integer(tokens(12).toInt),
new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),
normalize(tokens(15)), normalize(tokens(16))))
}
}
override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
new PojoField(cls.getDeclaredField("age"), Types.INT),
new PojoField(cls.getDeclaredField("job"), Types.STRING),
new PojoField(cls.getDeclaredField("marital"), Types.STRING),
new PojoField(cls.getDeclaredField("education"), Types.STRING),
new PojoField(cls.getDeclaredField("default"), Types.STRING),
new PojoField(cls.getDeclaredField("balance"), Types.STRING),
new PojoField(cls.getDeclaredField("housing"), Types.STRING),
new PojoField(cls.getDeclaredField("loan"), Types.STRING),
new PojoField(cls.getDeclaredField("contact"), Types.STRING),
new PojoField(cls.getDeclaredField("day"), Types.STRING),
new PojoField(cls.getDeclaredField("month"), Types.STRING),
new PojoField(cls.getDeclaredField("duration"), Types.INT),
new PojoField(cls.getDeclaredField("campaign"), Types.INT),
new PojoField(cls.getDeclaredField("pdays"), Types.INT),
new PojoField(cls.getDeclaredField("previous"), Types.INT),
new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
new PojoField(cls.getDeclaredField("y"), Types.STRING)
))
}
// remove the quote
private def normalize(token: String) = {
if (token.startsWith("\"")) {
token.substring(1, token.length - 1)
} else {
token
}
}
}{code}
And then I use this udf in sql but get the wrong result because the flink
reorder the fields implicitly.
!image-2020-01-13-16-02-57-949.png!
was:
I don't know why flink would do that, but this cause my user defined function
behavior incorrectly if I and pojo in my udf and override getResultType
[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
Here's the udf I define.
{code:java}
%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._
class Person(val age:Int, val job: String, val marital: String, val education:
String, val default: String, val balance: String, val housing: String, val
loan: String, val contact: String, val day: String, val month: String, val
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val
poutcome: String, val y: String)
class ParseFunction extends TableFunction[Person] {
def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)),
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)),
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)),
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new
Integer(tokens(11).toInt), new Integer(tokens(12).toInt),
new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),
normalize(tokens(15)), normalize(tokens(16))))
}
}
override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
new PojoField(cls.getDeclaredField("age"), Types.INT),
new PojoField(cls.getDeclaredField("job"), Types.STRING),
new PojoField(cls.getDeclaredField("marital"), Types.STRING),
new PojoField(cls.getDeclaredField("education"), Types.STRING),
new PojoField(cls.getDeclaredField("default"), Types.STRING),
new PojoField(cls.getDeclaredField("balance"), Types.STRING),
new PojoField(cls.getDeclaredField("housing"), Types.STRING),
new PojoField(cls.getDeclaredField("loan"), Types.STRING),
new PojoField(cls.getDeclaredField("contact"), Types.STRING),
new PojoField(cls.getDeclaredField("day"), Types.STRING),
new PojoField(cls.getDeclaredField("month"), Types.STRING),
new PojoField(cls.getDeclaredField("duration"), Types.INT),
new PojoField(cls.getDeclaredField("campaign"), Types.INT),
new PojoField(cls.getDeclaredField("pdays"), Types.INT),
new PojoField(cls.getDeclaredField("previous"), Types.INT),
new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
new PojoField(cls.getDeclaredField("y"), Types.STRING)
))
}
// remove the quote
private def normalize(token: String) = {
if (token.startsWith("\"")) {
token.substring(1, token.length - 1)
} else {
token
}
}
}{code}
And then I use this udf in sql but get the wrong result because the flink
reorder the fields implicitly.
!image-2020-01-13-16-02-57-949.png!
> Flink implicitly order the fields in PojoTypeInfo
> -------------------------------------------------
>
> Key: FLINK-15566
> URL: https://issues.apache.org/jira/browse/FLINK-15566
> Project: Flink
> Issue Type: Improvement
> Components: API / Core
> Affects Versions: 1.10.0
> Reporter: Jeff Zhang
> Priority: Major
> Attachments: image-2020-01-13-16-02-57-949.png
>
>
> I don't know why flink would do that, but this cause my user defined function
> behavior incorrectly if I and pojo in my udf and override getResultType
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
>
> Here's the udf I define.
>
> {code:java}
> %flink
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.common.typeinfo.Types
> import org.apache.flink.api.java.typeutils._
> import org.apache.flink.api.scala.typeutils._
> import org.apache.flink.api.scala._
> class Person(val age:Int, val job: String, val marital: String, val
> education: String, val default: String, val balance: String, val housing:
> String, val loan: String, val contact: String, val day: String, val month:
> String, val duration: Int, val campaign: Int, val pdays: Int, val previous:
> Int, val poutcome: String, val y: String)
> class ParseFunction extends TableFunction[Person] {
> def eval(line: String) {
> val tokens = line.split(";")
> // parse the line
> if (!line.startsWith("\"age\"")) {
> collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)),
> normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)),
> normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)),
> normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new
> Integer(tokens(11).toInt), new Integer(tokens(12).toInt),
> new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),
> normalize(tokens(15)), normalize(tokens(16))))
> }
> }
>
> override def getResultType() = {
> val cls = classOf[Person]
> new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
> new PojoField(cls.getDeclaredField("age"), Types.INT),
> new PojoField(cls.getDeclaredField("job"), Types.STRING),
> new PojoField(cls.getDeclaredField("marital"), Types.STRING),
> new PojoField(cls.getDeclaredField("education"), Types.STRING),
> new PojoField(cls.getDeclaredField("default"), Types.STRING),
> new PojoField(cls.getDeclaredField("balance"), Types.STRING),
> new PojoField(cls.getDeclaredField("housing"), Types.STRING),
> new PojoField(cls.getDeclaredField("loan"), Types.STRING),
> new PojoField(cls.getDeclaredField("contact"), Types.STRING),
> new PojoField(cls.getDeclaredField("day"), Types.STRING),
> new PojoField(cls.getDeclaredField("month"), Types.STRING),
> new PojoField(cls.getDeclaredField("duration"), Types.INT),
> new PojoField(cls.getDeclaredField("campaign"), Types.INT),
> new PojoField(cls.getDeclaredField("pdays"), Types.INT),
> new PojoField(cls.getDeclaredField("previous"), Types.INT),
> new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
> new PojoField(cls.getDeclaredField("y"), Types.STRING)
> ))
> }
> // remove the quote
> private def normalize(token: String) = {
> if (token.startsWith("\"")) {
> token.substring(1, token.length - 1)
> } else {
> token
> }
> }
> }{code}
> And then I use this udf in sql but get the wrong result because the flink
> reorder the fields implicitly.
> !image-2020-01-13-16-02-57-949.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)