[
https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873903#comment-16873903
]
aloyszhang commented on FLINK-12848:
------------------------------------
Hi [~jark]
I made a mistake for flink-1.9 test because I just pay attention to the DEBUG
information of FlinkTypeFactory#buildLogicalRowType.
Actually, there is no problem with flink-1.9.
Let's just see test flink-1.7 under.
Code need in my test display as follow:
SimpleProcessionTimeSource.java
{code:java}
public class SimpleProcessionTimeSource
implements StreamTableSource<Row>, DefinedProctimeAttribute {
public static final String PROCTIME_NAME = "timestamp";
private String[] fieldNames;
private TypeInformation<?>[] typeInformations;
private RowTypeInfo typeInfo;
private TableSchema tableSchema;
public SimpleProcessionTimeSource(String[] fieldNames,
TypeInformation<?>[] typeInformations) {
this.fieldNames = fieldNames;
this.typeInformations = typeInformations;
this.typeInfo = new RowTypeInfo(typeInformations, fieldNames);
String [] schemaFiled = new String [fieldNames.length + 1];
TypeInformation<?> [] schemaTypes = new TypeInformation[typeInformations.length
+ 1];
System.arraycopy(fieldNames,0, schemaFiled, 0 ,fieldNames.length);
System.arraycopy(typeInformations,0, schemaTypes, 0 ,typeInformations.length);
schemaFiled[fieldNames.length] = PROCTIME_NAME;
schemaTypes[typeInformations.length] = Types.SQL_TIMESTAMP;
this.tableSchema = new TableSchema(schemaFiled, schemaTypes);
}
@Override public DataStream<Row> getDataStream(
StreamExecutionEnvironment execEnv) {
DataStreamSource<Row> ds = execEnv.addSource(new SimpleSourceFunction(),
"pbSource", typeInfo);
return ds;
}
@Override public TypeInformation getReturnType() {
return typeInfo;
}
@Override
public TableSchema getTableSchema() {
return tableSchema;
}
@Override public String explainSource() {
return "";
}
@Nullable
@Override
public String getProctimeAttribute() {
return PROCTIME_NAME;
}
class SimpleSourceFunction implements SourceFunction<Row> {{
}
@Override
public void run(SourceContext<Row> sourceContext) throws Exception {
}
@Override
public void cancel() {
}
}
}{code}
Test code:
{code:java}
@Test
public void test001(){
String [] fields = new String []{"first", "second"};
TypeInformation<?>[] types = new TypeInformation[]{
Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
Types.ROW_NAMED(new String[]{"second002"}, Types.INT)
};
//build flink program
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment env =
StreamTableEnvironment.getTableEnvironment(execEnv);
SimpleProcessionTimeSource streamTableSource = new
SimpleProcessionTimeSource(fields, types);
env.registerTableSource("testSource", streamTableSource);
Table sourceTable = env.scan("testSource");
System.out.println("Source table schema : ");
sourceTable.printSchema();
Table table =
sourceTable.select("first.get('first001'),second.get('second002')");
table.printSchema();
try {
execEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
{code}
Test result :
{code:java}
Source table schema :
root
|-- first: Row(first001: Integer)
|-- second: Row(first001: Integer)
|-- timestamp: TimeIndicatorTypeInfo(proctime)
org.apache.flink.table.api.ValidationException: Expression
'second.get(second002) failed on input check: Field name 'second002' could not
be found.
at
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
at
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:97)
at
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84)
at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
at org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at org.apache.flink.table.plan.TreeNode.childrenTransform$1(TreeNode.scala:66)
at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:70)
at
org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:132)
at
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:145)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:144)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at
org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:150)
at
org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84)
at org.apache.flink.table.plan.logical.Project.validate(operators.scala:73)
at org.apache.flink.table.api.Table.select(table.scala:138)
at org.apache.flink.table.api.Table.select(table.scala:156)
at org.apache.flink.streaming.test.ScanTypeTest.test001(ScanTypeTest.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
{code}
This problem is caused by FlinkTypeFactory#buildLogicalRowType which calls
FlinkTypeFactory#createTypeFromTypeInfo.
In progress of FlinkTypeFactory#createTypeFromTypeInfo, it updates `seenTypes`
which type is `mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]`
with a Tuple2(TypeInfomation[_], boolean) as key.
If the row has nested type of RowTypeInfo as in test code .
After process the first type `Types.ROW_NAMED(new String[]\{"first001"},
Types.INT)`, `seenTypes` has an entry with key Tuple2(Type.Int, true).
Then process the second type `Types.ROW_NAMED(new String[]\{"second002"},
Types.INT)`, it will find key Tuple2(Type.Int, true) already in `seenTypes` and
return RelDataType of the first type. This will generate RelDataType with wrong
fieldname for the second type.
> Method equals() in RowTypeInfo should consider fieldsNames
> ----------------------------------------------------------
>
> Key: FLINK-12848
> URL: https://issues.apache.org/jira/browse/FLINK-12848
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.7.2
> Reporter: aloyszhang
> Assignee: aloyszhang
> Priority: Major
> Labels: pull-request-available
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Since the `RowTypeInfo#equals()` does not consider the fieldNames , when
> process data with RowTypeInfo type there may comes an error of the field
> name.
> {code:java}
> String [] fields = new String []{"first", "second"};
> TypeInformation<?>[] types = new TypeInformation[]{
> Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
> Types.ROW_NAMED(new String[]{"second002"}, Types.INT) };
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment env =
> StreamTableEnvironment.getTableEnvironment(execEnv);
> SimpleProcessionTimeSource streamTableSource = new
> SimpleProcessionTimeSource(fields, types);
> env.registerTableSource("testSource", streamTableSource);
> Table sourceTable = env.scan("testSource");
> System.out.println("Source table schema : ");
> sourceTable.printSchema();
> {code}
> The table shcema will be
> {code:java}
> Source table schema :
> root
> |-- first: Row(first001: Integer)
> |-- second: Row(first001: Integer)
> |-- timestamp: TimeIndicatorTypeInfo(proctime)
> {code}
> the second field has the same name with the first field.
> So, we should consider the fieldnames in RowTypeInfo#equals()
>
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)