gustavodemorais commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1957125408


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("a,1,1", "a,2,2", "a,3,3")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality 
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A 
(s, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'nested_array)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,[1, 2],1,1,1",
+      "1,[1, 2],1,2,2",
+      "1,[3],2,3,1",
+      "2,[4, 5],1,4,1",
+      "2,[4, 5],1,5,2",
+      "2,[6, 7],2,6,1",
+      "2,[6, 7],2,7,2",
+      "3,[8],1,8,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hi,1",
+      "1,Hello,2",
+      "2,World,1",
+      "3,Hello world,1"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT id, k, v, pos
+                     |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS 
f(k, v, pos)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21")
+
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  def testUnnestForMapOfRowsWitOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("a", "a"), Row.of(10: Integer))
+          map.put(Row.of("b", "b"), Row.of(11: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("c", "c"), Row.of(20: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(3), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("d", "d"), Row.of(30: Integer))
+          map.put(Row.of("e", "e"), Row.of(31: Integer))
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("a", "b"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.MAP(Types.ROW(Types.STRING, Types.STRING), 
Types.ROW(Types.INT())))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH 
ORDINALITY as f (k, v, o)"
+    val result = tEnv.sqlQuery(sqlQuery)
+    TestSinkUtil.addValuesSink(
+      tEnv,
+      "MySink",
+      List("a", "k", "v", "o"),
+      List(
+        DataTypes.INT,
+        DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
+        DataTypes.ROW(DataTypes.INT()),
+        DataTypes.INT.notNull()),
+      ChangelogMode.all()
+    )
+    result.executeInsert("MySink").await()
+
+    val expected =
+      List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
+    val resultWithoutOrd = assertAndRemoveOrdinality(
+      TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
+      2)
+    assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1),
+        Array("a", "b"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("x", "10")
+          map.put("y", "20")
+          map
+        }),
+      Row.of(
+        Int.box(2),
+        Array("c", "d"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("z", "30")
+          map.put("w", "40")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "array_data", "map_data"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.OBJECT_ARRAY(Types.STRING),
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'array_data, 'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, map_key, map_val, map_pos
+        |FROM T
+        |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, 
map_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutOrdinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List(
+      "1,a,1,x,10",
+      "1,a,1,y,20",
+      "1,b,2,x,10",
+      "1,b,2,y,20",
+      "2,c,1,z,30",
+      "2,c,1,w,40",
+      "2,d,2,z,30",
+      "2,d,2,w,40"
+    )
+
+    assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForEmptyArray(): Unit = {
+    val data = List(
+      (1, Array[Int]())
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List()
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForMapWithNullValues(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", null)
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", null)
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, k, v, pos
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null")
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (2, Array((20, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2,[20,41.6, 14,45.2136],20,41.6,1",
+      "2,[20,41.6, 14,45.2136],14,45.2136,2",
+      "3,[18,42.6],18,42.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2, ord FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6' AND ord <> 2
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,13,41.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
+    val data = List(
+      (1, (12, "45.6")),
+      (2, (12, "45.612")),
+      (2, (13, "41.6")),
+      (3, (14, "45.2136")),
+      (3, (18, "42.6")))
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(b) as `set` FROM T GROUP BY a)
+        |SELECT a, id, point, o FROM T1
+        |CROSS JOIN UNNEST(T1.`set`) WITH ORDINALITY AS A(id, point, o) WHERE 
a < 3
+      """.stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,12,45.612,1", "2,13,41.6,2")
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)

Review Comment:
   See https://github.com/apache/flink/pull/26113#discussion_r1957125379



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to