Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r142238516
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
---
@@ -210,6 +213,31 @@ class CorrelateITCase extends
StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testRowType(): Unit = {
+ val row = Row.of(
+ 12.asInstanceOf[Integer],
+ true.asInstanceOf[JBoolean],
+ Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer],
3.asInstanceOf[Integer])
+ )
+
+ val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT,
Types.INT, Types.INT))
+ val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b,
'c)
+
+ val tableFunc4 = new TableFunc4()
+ val result = in
+ .join(tableFunc4('c) as ('f0, 'f1, 'f2))
+ .select('c, 'f2)
+
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,3,3",
--- End diff --
is this the correct result? Shouldn't `'c` remain nested? We did not ask to
flatten it.
---